package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.List;
import java.util.Map;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.class */
public class TimeWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private long timeInMilliSeconds;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;
    private Scheduler scheduler;
    private ExecutionPlanContext executionPlanContext;

    public void setTimeInMilliSeconds(long j) {
        this.timeInMilliSeconds = j;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext) {
        this.executionPlanContext = executionPlanContext;
        this.expiredEventChunk = new ComplexEventChunk<>();
        if (expressionExecutorArr.length != 1) {
            throw new ExecutionPlanValidationException("Time window should only have one parameter (<int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new ExecutionPlanValidationException("Time window should have constant parameter attribute but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() == Attribute.Type.INT) {
            this.timeInMilliSeconds = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new ExecutionPlanValidationException("Time window's parameter attribute should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timeInMilliSeconds = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).longValue();
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected synchronized void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        while (complexEventChunk.hasNext()) {
            StreamEvent next = complexEventChunk.next();
            long currentTime = this.executionPlanContext.getTimestampGenerator().currentTime();
            StreamEvent streamEvent = null;
            if (next.getType() == ComplexEvent.Type.CURRENT) {
                streamEvent = streamEventCloner.copyStreamEvent(next);
                streamEvent.setType(ComplexEvent.Type.EXPIRED);
                streamEvent.setTimestamp(currentTime + this.timeInMilliSeconds);
            }
            boolean z = false;
            while (true) {
                if (!this.expiredEventChunk.hasNext()) {
                    break;
                }
                StreamEvent next2 = this.expiredEventChunk.next();
                if (next2.getTimestamp() - currentTime > 0) {
                    this.scheduler.notifyAt(next2.getTimestamp());
                    this.expiredEventChunk.reset();
                    z = true;
                    break;
                }
                this.expiredEventChunk.remove();
                complexEventChunk.insertBeforeCurrent(next2);
            }
            if (next.getType() == ComplexEvent.Type.CURRENT) {
                this.expiredEventChunk.add(streamEvent);
                if (!z) {
                    this.scheduler.notifyAt(streamEvent.getTimestamp());
                }
            }
            this.expiredEventChunk.reset();
        }
        processor.process(complexEventChunk);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(ComplexEvent complexEvent, Finder finder) {
        return finder.find(complexEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> list, Map<String, EventTable> map, int i, long j) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, list, map, i, this.inputDefinition, j);
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Object[] currentState() {
        return new Object[]{this.expiredEventChunk};
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Object[] objArr) {
        this.expiredEventChunk = (ComplexEventChunk) objArr[0];
    }
}
