package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-4.3.18.jar:org/wso2/siddhi/core/query/processor/stream/window/DelayWindowProcessor.class
 */
@Extension(name = "delay", namespace = "", description = "A delay window holds events for a specific time period that is regarded as a delay period before processing them.", parameters = {@Parameter(name = "window.delay", description = "The time period (specified in sec, min, ms) for which  the window should delay the events.", type = {DataType.INT, DataType.LONG, DataType.TIME})}, examples = {@Example(syntax = "define window delayWindow(symbol string, volume int) delay(1 hour);\ndefine stream PurchaseStream(symbol string, volume int);\ndefine stream DeliveryStream(symbol string);\ndefine stream OutputStream(symbol string);\n\n@info(name='query1') \nfrom PurchaseStream\nselect symbol, volume\ninsert into delayWindow;\n\n@info(name='query2') \nfrom delayWindow join DeliveryStream\non delayWindow.symbol == DeliveryStream.symbol\nselect delayWindow.symbol\ninsert into OutputStream;", description = "In this example, purchase events that arrive in the 'PurchaseStream' stream are directed to a delay window. At any given time, this delay window holds purchase events that have arrived within the last hour. These purchase events in the window are matched by the 'symbol' attribute, with delivery events that arrive in the 'DeliveryStream' stream. This monitors whether the delivery of products is done with a minimum delay of one hour after the purchase.")})
/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/DelayWindowProcessor.class */
public class DelayWindowProcessor extends TimeWindowProcessor {
    private long delayInMilliSeconds;
    private SiddhiAppContext siddhiAppContext;
    private SnapshotableStreamEventQueue delayedEventQueue;
    private volatile long lastTimestamp = Long.MIN_VALUE;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.delayedEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        if (expressionExecutorArr.length != 1) {
            throw new SiddhiAppValidationException("Delay window should only have one parameter (<int|long|time> delayTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Delay window should have constant parameter attribute but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.INT && expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Delay window's parameter attribute should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
        }
        this.delayInMilliSeconds = Long.parseLong(((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue().toString());
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                this.delayedEventQueue.reset();
                while (this.delayedEventQueue.hasNext()) {
                    StreamEvent next2 = this.delayedEventQueue.next();
                    if ((next2.getTimestamp() - currentTime) + this.delayInMilliSeconds > 0) {
                        break;
                    }
                    this.delayedEventQueue.remove();
                    next2.setTimestamp(currentTime);
                    complexEventChunk.insertBeforeCurrent(next2);
                }
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    this.delayedEventQueue.add(next);
                    if (this.lastTimestamp < next.getTimestamp()) {
                        getScheduler().notifyAt(next.getTimestamp() + this.delayInMilliSeconds);
                        this.lastTimestamp = next.getTimestamp();
                    }
                }
                complexEventChunk.remove();
            }
            this.delayedEventQueue.reset();
        }
        if (complexEventChunk.getFirst() != null) {
            processor.process(complexEventChunk);
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.delayedEventQueue, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.delayedEventQueue, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("DelayedEventQueue", this.delayedEventQueue.getSnapshot());
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        this.delayedEventQueue.restore((SnapshotStateList) map.get("DelayedEventQueue"));
    }
}
