package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaStateHolder;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.class */
public class ExternalTimeWindowProcessor extends WindowProcessor implements FindableProcessor {
    static final Logger log = Logger.getLogger(ExternalTimeWindowProcessor.class);
    private long timeToKeep;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;
    private VariableExpressionExecutor timeStampVariableExpressionExecutor;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ExecutionPlanContext executionPlanContext) {
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length != 2) {
            throw new ExecutionPlanValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        } else {
            this.timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + expressionExecutorArr[0].getClass());
        }
        this.timeStampVariableExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (this.timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
            throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + this.timeStampVariableExpressionExecutor.getReturnType());
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected synchronized void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        while (complexEventChunk.hasNext()) {
            StreamEvent next = complexEventChunk.next();
            long longValue = ((Long) next.getAttribute(this.timeStampVariableExpressionExecutor.getPosition())).longValue();
            StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
            copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
            while (true) {
                if (!this.expiredEventChunk.hasNext()) {
                    break;
                }
                StreamEvent next2 = this.expiredEventChunk.next();
                if ((((Long) next2.getAttribute(this.timeStampVariableExpressionExecutor.getPosition())).longValue() - longValue) + this.timeToKeep > 0) {
                    this.expiredEventChunk.reset();
                    break;
                } else {
                    this.expiredEventChunk.remove();
                    next2.setTimestamp(longValue);
                    complexEventChunk.insertBeforeCurrent(next2);
                }
            }
            if (next.getType() == ComplexEvent.Type.CURRENT) {
                this.expiredEventChunk.add(copyStreamEvent);
            }
            this.expiredEventChunk.reset();
        }
        processor.process(complexEventChunk);
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Object[] currentState() {
        return new Object[]{this.expiredEventChunk.getFirst()};
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Object[] objArr) {
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) objArr[0]);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, Finder finder) {
        return finder.find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public Finder constructFinder(Expression expression, MatchingMetaStateHolder matchingMetaStateHolder, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> list, Map<String, EventTable> map) {
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaStateHolder, executionPlanContext, list, map);
    }
}
