package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.osgi.service.upnp.UPnPStateVariable;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = UPnPStateVariable.TYPE_TIME, namespace = "", description = "A sliding time window that holds events that arrived during the last windowTime period at a given time, and gets updated for each event arrival and expiry.", parameters = {@Parameter(name = "window.time", description = "The sliding time period for which the window should hold events.", type = {DataType.INT, DataType.LONG, DataType.TIME})}, examples = {@Example(syntax = "define window cseEventWindow (symbol string, price float, volume int) time(20) output all events;\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n@info(name = 'query1')\nfrom cseEventWindow\nselect symbol, sum(price) as price\ninsert all events into outputStream ;", description = "This will processing events that arrived within the last 20 milliseconds.")})
/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/TimeWindowProcessor.class */
public class TimeWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private long timeInMilliSeconds;
    private SnapshotableStreamEventQueue expiredEventQueue;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private volatile long lastTimestamp = Long.MIN_VALUE;

    public void setTimeInMilliSeconds(long j) {
        this.timeInMilliSeconds = j;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.expiredEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        if (expressionExecutorArr.length != 1) {
            throw new SiddhiAppValidationException("Time window should only have one parameter (<int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Time window should have constant parameter attribute but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() == Attribute.Type.INT) {
            this.timeInMilliSeconds = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("Time window's parameter attribute should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timeInMilliSeconds = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).longValue();
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                this.expiredEventQueue.reset();
                while (this.expiredEventQueue.hasNext()) {
                    StreamEvent next2 = this.expiredEventQueue.next();
                    if ((next2.getTimestamp() - currentTime) + this.timeInMilliSeconds > 0) {
                        break;
                    }
                    this.expiredEventQueue.remove();
                    next2.setTimestamp(currentTime);
                    complexEventChunk.insertBeforeCurrent(next2);
                }
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventQueue.add(copyStreamEvent);
                    if (this.lastTimestamp < copyStreamEvent.getTimestamp()) {
                        this.scheduler.notifyAt(copyStreamEvent.getTimestamp() + this.timeInMilliSeconds);
                        this.lastTimestamp = copyStreamEvent.getTimestamp();
                    }
                } else {
                    complexEventChunk.remove();
                }
            }
            this.expiredEventQueue.reset();
        }
        processor.process(complexEventChunk);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.expiredEventQueue, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.expiredEventQueue, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("ExpiredEventQueue", this.expiredEventQueue.getSnapshot());
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        this.expiredEventQueue.restore((SnapshotStateList) map.get("ExpiredEventQueue"));
    }
}
