package org.wso2.siddhi.core.query.processor.window;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.event.remove.RemoveEvent;
import org.wso2.siddhi.core.event.remove.RemoveListEvent;
import org.wso2.siddhi.core.util.collection.queue.IQueue;
import org.wso2.siddhi.core.util.collection.queue.Queue;
import org.wso2.siddhi.core.util.collection.queue.QueueGrid;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;

/* loaded from: input_file:org/wso2/siddhi/core/query/processor/window/LengthBatchWindowProcessor.class */
public class LengthBatchWindowProcessor extends WindowProcessor {
    private int lengthToKeep;
    private List<InEvent> newEventList;
    private List<RemoveEvent> oldEventList;
    private IQueue<StreamEvent> window;

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    public void setParameters(Expression[] expressionArr) {
        this.lengthToKeep = ((IntConstant) expressionArr[0]).getValue().intValue();
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    protected void processEvent(InEvent inEvent) {
        acquireLock();
        try {
            processLengthBatchWindow(inEvent);
            releaseLock();
        } catch (Throwable th) {
            releaseLock();
            throw th;
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    protected void processEvent(InListEvent inListEvent) {
        acquireLock();
        try {
            Event[] events = inListEvent.getEvents();
            int activeEvents = inListEvent.getActiveEvents();
            for (int i = 0; i < activeEvents; i++) {
                processLengthBatchWindow((InEvent) events[i]);
            }
        } finally {
            releaseLock();
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    public Iterator<StreamEvent> iterator() {
        return this.window.iterator();
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    public Iterator<StreamEvent> iterator(String str) {
        return this.siddhiContext.isDistributedProcessing() ? ((QueueGrid) this.window).iterator(str) : this.window.iterator();
    }

    private void processLengthBatchWindow(InEvent inEvent) {
        this.newEventList.add(inEvent);
        if (log.isDebugEnabled()) {
            log.debug("newEventList size " + this.newEventList.size() + " with event " + inEvent);
        }
        if (this.newEventList.size() == this.lengthToKeep) {
            this.oldEventList.clear();
            while (true) {
                RemoveEvent removeEvent = (RemoveEvent) this.window.poll();
                if (removeEvent == null) {
                    break;
                }
                removeEvent.setExpiryTime(System.currentTimeMillis());
                this.oldEventList.add(removeEvent);
            }
            if (this.oldEventList.size() > 0) {
                this.nextProcessor.process(new RemoveListEvent((RemoveEvent[]) this.oldEventList.toArray(new RemoveEvent[this.oldEventList.size()])));
                this.oldEventList.clear();
            }
            InEvent[] inEventArr = (InEvent[]) this.newEventList.toArray(new InEvent[this.newEventList.size()]);
            for (InEvent inEvent2 : inEventArr) {
                this.window.put(new RemoveEvent(inEvent2, -1L));
            }
            this.nextProcessor.process(new InListEvent(inEventArr));
            this.newEventList.clear();
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    protected Object[] currentState() {
        return new Object[]{this.window.currentState(), this.oldEventList, this.newEventList};
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    protected void restoreState(Object[] objArr) {
        this.window.restoreState((Object[]) objArr[0]);
        this.oldEventList = (ArrayList) objArr[1];
        this.newEventList = (ArrayList) objArr[2];
    }

    @Override // org.wso2.siddhi.core.query.processor.window.WindowProcessor
    protected void initWindow() {
        if (this.siddhiContext.isDistributedProcessing()) {
            this.window = new QueueGrid(this.elementId, this.siddhiContext, this.async);
        } else {
            this.window = new Queue();
        }
        this.oldEventList = new ArrayList();
        if (this.siddhiContext.isDistributedProcessing()) {
            this.newEventList = this.siddhiContext.getHazelcastInstance().getList(this.elementId + "-newEventList");
        } else {
            this.newEventList = new ArrayList();
        }
    }
}
