package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeBatchWindowProcessor.class */
public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private ExpressionExecutor timestampExpressionExecutor;
    private long timeToKeep;
    private Scheduler scheduler;
    private long lastScheduledTime;
    private long lastCurrentEventTime;
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>(false);
    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<>(false);
    private long endTime = -1;
    private long startTime = 0;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = -1;
    private boolean flushed = false;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext) {
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length < 2 || expressionExecutorArr.length > 4) {
            throw new ExecutionPlanValidationException("ExternalTimeBatch window should only have two to four parameters (<long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (expressionExecutorArr[0] instanceof ConstantExpressionExecutor) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timestamp should not be a constant ");
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timestamp should be type long, but found " + expressionExecutorArr[0].getReturnType());
        }
        this.timestampExpressionExecutor = expressionExecutorArr[0];
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanValidationException("ExternalTimeBatch window's 2nd parameter windowTime should be either int or long, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.timeToKeep = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).longValue();
        }
        if (expressionExecutorArr.length >= 3) {
            this.isStartTimeEnabled = true;
            if (expressionExecutorArr[2].getReturnType() == Attribute.Type.INT) {
                this.startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
            } else {
                if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                    throw new ExecutionPlanValidationException("ExternalTimeBatch window's 3rd parameter startTime should be either int or long, but found " + expressionExecutorArr[2].getReturnType());
                }
                this.startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
            }
        }
        if (expressionExecutorArr.length == 4) {
            if (expressionExecutorArr[3].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            } else {
                if (expressionExecutorArr[3].getReturnType() != Attribute.Type.LONG) {
                    throw new ExecutionPlanValidationException("ExternalTimeBatch window's 4th parameter timeout should be either int or long, but found " + expressionExecutorArr[3].getReturnType());
                }
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            }
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        if (complexEventChunk.getFirst() == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            initTiming(complexEventChunk.getFirst());
            StreamEvent first = complexEventChunk.getFirst();
            while (first != null) {
                StreamEvent streamEvent = first;
                first = first.getNext();
                if (streamEvent.getType() == ComplexEvent.Type.TIMER) {
                    if (this.lastScheduledTime <= streamEvent.getTimestamp()) {
                        if (!this.flushed) {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime);
                            this.flushed = true;
                        } else if (this.currentEventChunk.getFirst() != null) {
                            appendToOutputChunk(streamEventCloner, arrayList);
                        }
                        this.lastScheduledTime = this.executionPlanContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                        this.scheduler.notifyAt(this.lastScheduledTime);
                    }
                } else if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    long longValue = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
                    if (this.lastCurrentEventTime < longValue) {
                        this.lastCurrentEventTime = longValue;
                    }
                    if (longValue < this.endTime) {
                        cloneAppend(streamEventCloner, streamEvent);
                    } else {
                        if (this.flushed) {
                            appendToOutputChunk(streamEventCloner, arrayList);
                            this.flushed = false;
                        } else {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime);
                        }
                        this.endTime = findEndTime(this.lastCurrentEventTime, this.startTime, this.timeToKeep);
                        cloneAppend(streamEventCloner, streamEvent);
                        if (this.schedulerTimeout > 0) {
                            this.lastScheduledTime = this.executionPlanContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                            this.scheduler.notifyAt(this.lastScheduledTime);
                        }
                    }
                }
            }
        }
        Iterator<ComplexEventChunk<StreamEvent>> it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process(it.next());
        }
    }

    private void initTiming(StreamEvent streamEvent) {
        if (this.endTime < 0) {
            if (this.isStartTimeEnabled) {
                this.endTime = findEndTime(((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue(), this.startTime, this.timeToKeep);
            } else {
                this.startTime = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
                this.endTime = this.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0) {
                this.lastScheduledTime = this.executionPlanContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(this.lastScheduledTime);
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        if (this.expiredEventChunk.getFirst() != null) {
            this.expiredEventChunk.reset();
            while (this.expiredEventChunk.hasNext()) {
                this.expiredEventChunk.next().setTimestamp(j);
            }
            complexEventChunk.add(this.expiredEventChunk.getFirst());
        }
        this.expiredEventChunk.clear();
        if (this.currentEventChunk.getFirst() != null) {
            this.currentEventChunk.reset();
            while (this.currentEventChunk.hasNext()) {
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                this.expiredEventChunk.add(copyStreamEvent);
            }
            complexEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        this.currentEventChunk.reset();
        while (this.currentEventChunk.hasNext()) {
            StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
            copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
            this.expiredEventChunk.add(copyStreamEvent);
        }
        complexEventChunk.add(this.currentEventChunk.getFirst());
        this.currentEventChunk.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    private long findEndTime(long j, long j2, long j3) {
        return j + (j3 - ((j - j2) % j3));
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent streamEvent) {
        this.currentEventChunk.add(streamEventCloner.copyStreamEvent(streamEvent));
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Object[] currentState() {
        return new Object[]{this.currentEventChunk.getFirst(), this.expiredEventChunk.getFirst(), Long.valueOf(this.endTime), Long.valueOf(this.startTime), Boolean.valueOf(this.isStartTimeEnabled), Long.valueOf(this.schedulerTimeout)};
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Object[] objArr) {
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent) objArr[0]);
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) objArr[1]);
        this.endTime = ((Long) objArr[2]).longValue();
        this.startTime = ((Long) objArr[3]).longValue();
        this.isStartTimeEnabled = ((Boolean) objArr[4]).booleanValue();
        this.schedulerTimeout = ((Long) objArr[5]).longValue();
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(ComplexEvent complexEvent, Finder finder) {
        return finder.find(complexEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> list, Map<String, EventTable> map, int i, long j) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, list, map, i, this.inputDefinition, j);
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public Scheduler getScheduler() {
        return this.scheduler;
    }
}
