package org.ballerinalang.siddhi.core.query.processor.stream.window;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ballerinalang.siddhi.annotation.Example;
import org.ballerinalang.siddhi.annotation.Extension;
import org.ballerinalang.siddhi.annotation.Parameter;
import org.ballerinalang.siddhi.annotation.util.DataType;
import org.ballerinalang.siddhi.core.config.SiddhiAppContext;
import org.ballerinalang.siddhi.core.event.ComplexEvent;
import org.ballerinalang.siddhi.core.event.ComplexEventChunk;
import org.ballerinalang.siddhi.core.event.state.StateEvent;
import org.ballerinalang.siddhi.core.event.stream.StreamEvent;
import org.ballerinalang.siddhi.core.event.stream.StreamEventCloner;
import org.ballerinalang.siddhi.core.executor.ConstantExpressionExecutor;
import org.ballerinalang.siddhi.core.executor.ExpressionExecutor;
import org.ballerinalang.siddhi.core.executor.VariableExpressionExecutor;
import org.ballerinalang.siddhi.core.query.processor.Processor;
import org.ballerinalang.siddhi.core.table.Table;
import org.ballerinalang.siddhi.core.util.collection.operator.CompiledCondition;
import org.ballerinalang.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.ballerinalang.siddhi.core.util.collection.operator.Operator;
import org.ballerinalang.siddhi.core.util.config.ConfigReader;
import org.ballerinalang.siddhi.core.util.parser.OperatorParser;
import org.ballerinalang.siddhi.query.api.definition.Attribute;
import org.ballerinalang.siddhi.query.api.exception.SiddhiAppValidationException;
import org.ballerinalang.siddhi.query.api.expression.Expression;

@Extension(name = "externalTime", namespace = "", description = "A sliding time window based on external time. It holds events that arrived during the last windowTime period from the external timestamp, and gets updated on every monotonically increasing timestamp.", parameters = {@Parameter(name = "timestamp", description = "The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.", type = {DataType.LONG}), @Parameter(name = "window.time", description = "The sliding time period for which the window should hold events.", type = {DataType.INT, DataType.LONG, DataType.TIME})}, examples = {@Example(syntax = "define window cseEventWindow (symbol string, price float, volume int) externalTime(eventTime, 20 sec) output expired events;\n\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n\n@info(name = 'query1')\nfrom cseEventWindow\nselect symbol, sum(price) as price\ninsert expired events into outputStream ;", description = "processing events arrived within the last 20 seconds from the eventTime and output expired events.")})
/* loaded from: input_file:org/ballerinalang/siddhi/core/query/processor/stream/window/ExternalTimeWindowProcessor.class */
public class ExternalTimeWindowProcessor extends WindowProcessor implements FindableProcessor {
    private long timeToKeep;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;
    private VariableExpressionExecutor timeStampVariableExpressionExecutor;

    @Override // org.ballerinalang.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        } else {
            this.timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()));
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + expressionExecutorArr[0].getClass());
        }
        this.timeStampVariableExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (this.timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + this.timeStampVariableExpressionExecutor.getReturnType());
        }
    }

    @Override // org.ballerinalang.siddhi.core.query.processor.stream.window.WindowProcessor
    protected synchronized void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        while (complexEventChunk.hasNext()) {
            StreamEvent next = complexEventChunk.next();
            long longValue = ((Long) this.timeStampVariableExpressionExecutor.execute(next)).longValue();
            StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
            copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
            this.expiredEventChunk.reset();
            while (true) {
                if (!this.expiredEventChunk.hasNext()) {
                    break;
                }
                StreamEvent next2 = this.expiredEventChunk.next();
                if ((((Long) this.timeStampVariableExpressionExecutor.execute(next2)).longValue() - longValue) + this.timeToKeep > 0) {
                    this.expiredEventChunk.reset();
                    break;
                } else {
                    this.expiredEventChunk.remove();
                    next2.setTimestamp(longValue);
                    complexEventChunk.insertBeforeCurrent(next2);
                }
            }
            if (next.getType() == ComplexEvent.Type.CURRENT) {
                this.expiredEventChunk.add(copyStreamEvent);
            }
            this.expiredEventChunk.reset();
        }
        processor.process(complexEventChunk);
    }

    @Override // org.ballerinalang.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.ballerinalang.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.ballerinalang.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("ExpiredEventChunk", this.expiredEventChunk.getFirst());
        return hashMap;
    }

    @Override // org.ballerinalang.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) map.get("ExpiredEventChunk"));
    }

    @Override // org.ballerinalang.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.ballerinalang.siddhi.core.query.processor.stream.window.FindableProcessor
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }
}
