package org.wso2.siddhi.core.query.processor.stream.window;

import io.netty.handler.codec.rtsp.RtspHeaders;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-4.4.11.jar:org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeBatchWindowProcessor.class
 */
@Extension(name = "externalTimeBatch", namespace = "", description = "A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime.", parameters = {@Parameter(name = "timestamp", description = "The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.", type = {DataType.LONG}), @Parameter(name = "window.time", description = "The batch time period for which the window should hold events.", type = {DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name = "start.time", description = "User defined start time. This could either be a constant (of type int, long or time) or an attribute of the corresponding stream (of type long). If an attribute is provided, initial value of attribute would be considered as startTime.", type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "Timestamp of first event"), @Parameter(name = RtspHeaders.Values.TIMEOUT, description = "Time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch.", type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "System waits till an event from next batch arrives to flush current batch")}, examples = {@Example(syntax = "define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 1 sec) output expired events;\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n@info(name = 'query1')\nfrom cseEventWindow\nselect symbol, sum(price) as price\ninsert expired events into outputStream ;", description = "This will processing events that arrive every 1 seconds from the eventTime."), @Example(syntax = "define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 20 sec, 0) output expired events;", description = "This will processing events that arrive every 1 seconds from the eventTime. Starts on 0th millisecond of an hour."), @Example(syntax = "define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 2 sec, eventTimestamp, 100) output expired events;", description = "This will processing events that arrive every 2 seconds from the eventTim. Considers the first event's eventTimestamp value as startTime. Waits 100 milliseconds for the arrival of a new event before flushing current batch.")})
/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/ExternalTimeBatchWindowProcessor.class */
public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private VariableExpressionExecutor timestampExpressionExecutor;
    private ExpressionExecutor startTimeAsVariable;
    private long timeToKeep;
    private Scheduler scheduler;
    private long lastScheduledTime;
    private long lastCurrentEventTime;
    private boolean outputExpectsExpiredEvents;
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>(false);
    private ComplexEventChunk<StreamEvent> expiredEventChunk = null;
    private StreamEvent resetEvent = null;
    private long endTime = -1;
    private long startTime = 0;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = 0;
    private boolean flushed = false;
    private boolean storeExpiredEvents = false;
    private boolean replaceTimestampWithBatchEndTime = false;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.outputExpectsExpiredEvents = z;
        if (z) {
            this.expiredEventChunk = new ComplexEventChunk<>(false);
            this.storeExpiredEvents = true;
        }
        if (expressionExecutorArr.length < 2 || expressionExecutorArr.length > 5) {
            throw new SiddhiAppValidationException("ExternalTimeBatch window should only have two to five parameters (<long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout, <bool> replaceTimestampWithBatchEndTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be a variable, but found " + expressionExecutorArr[0].getClass());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be type long, but found " + expressionExecutorArr[0].getReturnType());
        }
        this.timestampExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("ExternalTimeBatch window's 2nd parameter windowTime should be either int or long, but found " + expressionExecutorArr[1].getReturnType());
            }
            this.timeToKeep = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).longValue();
        }
        if (expressionExecutorArr.length >= 3) {
            this.isStartTimeEnabled = true;
            if (expressionExecutorArr[2] instanceof ConstantExpressionExecutor) {
                if (expressionExecutorArr[2].getReturnType() == Attribute.Type.INT) {
                    this.startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
                } else {
                    if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                        throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + expressionExecutorArr[2].getReturnType());
                    }
                    this.startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()));
                }
            } else {
                if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + expressionExecutorArr[2].getReturnType());
                }
                this.startTimeAsVariable = expressionExecutorArr[2];
            }
        }
        if (expressionExecutorArr.length >= 4) {
            if (expressionExecutorArr[3].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            } else {
                if (expressionExecutorArr[3].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 4th parameter timeout should be either int or long, but found " + expressionExecutorArr[3].getReturnType());
                }
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            }
        }
        if (expressionExecutorArr.length == 5) {
            if (expressionExecutorArr[4].getReturnType() != Attribute.Type.BOOL) {
                throw new SiddhiAppValidationException("ExternalTimeBatch window's 5th parameter replaceTimestampWithBatchEndTime should be bool, but found " + expressionExecutorArr[4].getReturnType());
            }
            this.replaceTimestampWithBatchEndTime = Boolean.parseBoolean(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[4]).getValue()));
        }
        if (this.schedulerTimeout <= 0 || this.expiredEventChunk != null) {
            return;
        }
        this.expiredEventChunk = new ComplexEventChunk<>(false);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        if (complexEventChunk.getFirst() == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            initTiming(complexEventChunk.getFirst());
            StreamEvent first = complexEventChunk.getFirst();
            while (first != null) {
                StreamEvent streamEvent = first;
                first = first.getNext();
                if (streamEvent.getType() == ComplexEvent.Type.TIMER) {
                    if (this.lastScheduledTime <= streamEvent.getTimestamp()) {
                        if (!this.flushed) {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, true);
                            this.flushed = true;
                        } else if (this.currentEventChunk.getFirst() != null) {
                            appendToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, true);
                        }
                        this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                        this.scheduler.notifyAt(this.lastScheduledTime);
                    }
                } else if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    long longValue = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
                    if (this.lastCurrentEventTime < longValue) {
                        this.lastCurrentEventTime = longValue;
                    }
                    if (longValue < this.endTime) {
                        cloneAppend(streamEventCloner, streamEvent);
                    } else {
                        if (this.flushed) {
                            appendToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, false);
                            this.flushed = false;
                        } else {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, false);
                        }
                        this.endTime = findEndTime(this.lastCurrentEventTime, this.startTime, this.timeToKeep);
                        cloneAppend(streamEventCloner, streamEvent);
                        if (this.schedulerTimeout > 0) {
                            this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                            this.scheduler.notifyAt(this.lastScheduledTime);
                        }
                    }
                }
            }
        }
        Iterator<ComplexEventChunk<StreamEvent>> it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process(it.next());
        }
    }

    private void initTiming(StreamEvent streamEvent) {
        if (this.endTime < 0) {
            if (!this.isStartTimeEnabled) {
                this.startTime = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
                this.endTime = this.startTime + this.timeToKeep;
            } else if (this.startTimeAsVariable == null) {
                this.endTime = findEndTime(((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue(), this.startTime, this.timeToKeep);
            } else {
                this.startTime = ((Long) this.startTimeAsVariable.execute(streamEvent)).longValue();
                this.endTime = this.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0) {
                this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(this.lastScheduledTime);
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j, boolean z) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        if (this.outputExpectsExpiredEvents && this.expiredEventChunk.getFirst() != null) {
            this.expiredEventChunk.reset();
            while (this.expiredEventChunk.hasNext()) {
                this.expiredEventChunk.next().setTimestamp(j);
            }
            complexEventChunk.add(this.expiredEventChunk.getFirst());
        }
        if (this.expiredEventChunk != null) {
            this.expiredEventChunk.clear();
        }
        if (this.currentEventChunk.getFirst() != null) {
            this.resetEvent.setTimestamp(j);
            complexEventChunk.add(this.resetEvent);
            this.resetEvent = null;
            if (z || this.storeExpiredEvents) {
                this.currentEventChunk.reset();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(copyStreamEvent);
                }
            }
            complexEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j, boolean z) {
        ComplexEventChunk complexEventChunk = new ComplexEventChunk(true);
        ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(true);
        if (this.currentEventChunk.getFirst() != null) {
            if (this.expiredEventChunk.getFirst() != null) {
                this.expiredEventChunk.reset();
                while (this.expiredEventChunk.hasNext()) {
                    StreamEvent next = this.expiredEventChunk.next();
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                        copyStreamEvent.setTimestamp(j);
                        complexEventChunk.add(copyStreamEvent);
                    }
                    StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent2.setType(ComplexEvent.Type.CURRENT);
                    complexEventChunk2.add(copyStreamEvent2);
                }
            }
            StreamEvent copyStreamEvent3 = streamEventCloner.copyStreamEvent(this.resetEvent);
            copyStreamEvent3.setTimestamp(j);
            complexEventChunk.add(copyStreamEvent3);
            complexEventChunk.add(complexEventChunk2.getFirst());
            if (z || this.storeExpiredEvents) {
                this.currentEventChunk.reset();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent copyStreamEvent4 = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
                    copyStreamEvent4.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(copyStreamEvent4);
                }
            }
            complexEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    private long findEndTime(long j, long j2, long j3) {
        return j + (j3 - ((j - j2) % j3));
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent streamEvent) {
        StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
        if (this.replaceTimestampWithBatchEndTime) {
            copyStreamEvent.setAttribute(Long.valueOf(this.endTime), this.timestampExpressionExecutor.getPosition());
        }
        this.currentEventChunk.add(copyStreamEvent);
        if (this.resetEvent == null) {
            this.resetEvent = streamEventCloner.copyStreamEvent(streamEvent);
            this.resetEvent.setType(ComplexEvent.Type.RESET);
        }
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        synchronized (this) {
            hashMap.put("StartTime", Long.valueOf(this.startTime));
            hashMap.put("EndTime", Long.valueOf(this.endTime));
            hashMap.put("LastScheduledTime", Long.valueOf(this.lastScheduledTime));
            hashMap.put("LastCurrentEventTime", Long.valueOf(this.lastCurrentEventTime));
            hashMap.put("CurrentEventChunk", this.currentEventChunk.getFirst());
            hashMap.put("ExpiredEventChunk", this.expiredEventChunk != null ? this.expiredEventChunk.getFirst() : null);
            hashMap.put("ResetEvent", this.resetEvent);
            hashMap.put("Flushed", Boolean.valueOf(this.flushed));
        }
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public synchronized void restoreState(Map<String, Object> map) {
        this.startTime = ((Long) map.get("StartTime")).longValue();
        this.endTime = ((Long) map.get("EndTime")).longValue();
        this.lastScheduledTime = ((Long) map.get("LastScheduledTime")).longValue();
        this.lastCurrentEventTime = ((Long) map.get("LastCurrentEventTime")).longValue();
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent) map.get("CurrentEventChunk"));
        if (this.expiredEventChunk != null) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((StreamEvent) map.get("ExpiredEventChunk"));
        } else {
            if (this.outputExpectsExpiredEvents) {
                this.expiredEventChunk = new ComplexEventChunk<>(false);
            }
            if (this.schedulerTimeout > 0) {
                this.expiredEventChunk = new ComplexEventChunk<>(false);
            }
        }
        this.resetEvent = (StreamEvent) map.get("ResetEvent");
        this.flushed = ((Boolean) map.get("Flushed")).booleanValue();
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        return ((Operator) compiledCondition).find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        if (this.expiredEventChunk == null) {
            this.expiredEventChunk = new ComplexEventChunk<>(false);
            this.storeExpiredEvents = true;
        }
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // org.wso2.siddhi.core.query.processor.SchedulingProcessor
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
}
