package org.wso2.extension.siddhi.execution.unique;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = "externalTimeBatch", namespace = "unique", description = "This is a batch (tumbling) time window that is determined based on an external time, i.e., time stamps that are specified via an attribute in the events. It holds the latest unique events that arrived during the last window time period. The unique events are determined based on the value for a specified unique key parameter. When a new event arrives within the time window with a value for the unique key parameter that is the same as that of an existing event in the window, the existing event expires and it is replaced by the new event.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE}), @Parameter(name = "time.stamp", description = " The time which the window determines as the current time and acts upon. The value of this parameter should be monotonically increasing.", type = {DataType.LONG}), @Parameter(name = "window.time", description = "The sliding time period for which the window should hold events.", type = {DataType.INT, DataType.LONG}), @Parameter(name = "start.time", description = "This specifies an offset in milliseconds in order to start the window at a time different to the standard time.", defaultValue = "Timestamp of first event", type = {DataType.INT}, optional = true), @Parameter(name = "time.out", description = "Time to wait for arrival of a new event, before flushing and returning the output for events belonging to a specific batch.", type = {DataType.INT, DataType.LONG}, optional = true, defaultValue = "The system waits till an event from the next batch arrives to flush the current batch"), @Parameter(name = "replace.time.stamp.with.batch.end.time", description = "Replaces the 'timestamp' value with the corresponding batch end time stamp.", type = {DataType.INT, DataType.LONG}, optional = true, defaultValue = "false")}, examples = {@Example(syntax = "define stream LoginEvents (timestamp long, ip string) ;\nfrom LoginEvents#window.unique:externalTimeBatch(ip, timestamp, 1 sec, 0, 2 sec) \nselect timestamp, ip, count() as total\ninsert into UniqueIps ;", description = "In this query, the window holds the latest unique events that arrive from the 'LoginEvent' stream during each second. The latest events are determined based on the external time stamp. At a given time, all the events held in the window have unique values for the 'ip' and monotonically increasing values for 'timestamp' attributes. The events in the window are inserted into the 'UniqueIps' output stream. The system waits for 2 seconds for the arrival of a new event before flushing the current batch.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/unique/UniqueExternalTimeBatchWindowProcessor.class */
public class UniqueExternalTimeBatchWindowProcessor extends WindowProcessor implements SchedulingProcessor, FindableProcessor {
    private VariableExpressionExecutor timestampExpressionExecutor;
    private long timeToKeep;
    private Scheduler scheduler;
    private long lastScheduledTime;
    private long lastCurrentEventTime;
    private ExpressionExecutor uniqueExpressionExecutor;
    private boolean outputExpectsExpiredEvents;
    private Map<Object, StreamEvent> currentEvents = new LinkedHashMap();
    private Map<Object, StreamEvent> expiredEvents = null;
    private volatile StreamEvent resetEvent = null;
    private long endTime = -1;
    private long startTime = 0;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = 0;
    private boolean flushed = false;
    private boolean storeExpiredEvents = false;
    private boolean replaceTimestampWithBatchEndTime = false;

    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        if (z) {
            this.expiredEvents = new LinkedHashMap();
            this.storeExpiredEvents = true;
        }
        this.outputExpectsExpiredEvents = z;
        if (expressionExecutorArr.length < 3 || expressionExecutorArr.length > 6) {
            throw new SiddhiAppValidationException("ExternalTimeBatch window should only have three to six parameters (<variable> uniqueAttribute, <long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout, <bool> replaceTimestampWithBatchEndTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        this.uniqueExpressionExecutor = expressionExecutorArr[0];
        if (!(expressionExecutorArr[1] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 2nd parameter timestamp should be a variable, but found " + expressionExecutorArr[1].getClass());
        }
        if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 2nd parameter timestamp should be type long, but found " + expressionExecutorArr[1].getReturnType());
        }
        this.timestampExpressionExecutor = (VariableExpressionExecutor) expressionExecutorArr[1];
        if (expressionExecutorArr[2].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter windowTime should be either int or long, but found " + expressionExecutorArr[2].getReturnType());
            }
            this.timeToKeep = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).longValue();
        }
        if (expressionExecutorArr.length >= 4) {
            this.isStartTimeEnabled = true;
            if (expressionExecutorArr[3].getReturnType() == Attribute.Type.INT) {
                this.startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            } else {
                if (expressionExecutorArr[3].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 4th parameter startTime should be either int or long, but found " + expressionExecutorArr[3].getReturnType());
                }
                this.startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[3]).getValue()));
            }
        }
        if (expressionExecutorArr.length >= 5) {
            if (expressionExecutorArr[4].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[4]).getValue()));
            } else {
                if (expressionExecutorArr[4].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 5th parameter timeout should be either int or long, but found " + expressionExecutorArr[4].getReturnType());
                }
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[4]).getValue()));
            }
        }
        if (expressionExecutorArr.length == 6) {
            if (expressionExecutorArr[5].getReturnType() != Attribute.Type.BOOL) {
                throw new SiddhiAppValidationException("ExternalTimeBatch window's 6th parameter replaceTimestampWithBatchEndTime should be bool, but found " + expressionExecutorArr[5].getReturnType());
            }
            this.replaceTimestampWithBatchEndTime = Boolean.parseBoolean(String.valueOf(((ConstantExpressionExecutor) expressionExecutorArr[5]).getValue()));
        }
        if (this.schedulerTimeout <= 0 || this.expiredEvents != null) {
            return;
        }
        this.expiredEvents = new LinkedHashMap();
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        if (complexEventChunk.getFirst() == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            initTiming((StreamEvent) complexEventChunk.getFirst());
            StreamEvent streamEvent = (StreamEvent) complexEventChunk.getFirst();
            while (streamEvent != null) {
                StreamEvent streamEvent2 = streamEvent;
                streamEvent = streamEvent.getNext();
                if (streamEvent2.getType() == ComplexEvent.Type.TIMER) {
                    if (this.lastScheduledTime <= streamEvent2.getTimestamp()) {
                        if (!this.flushed) {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, true);
                            this.flushed = true;
                        } else if (this.currentEvents.size() > 0) {
                            appendToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, true);
                        }
                        this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                        if (this.scheduler != null) {
                            this.scheduler.notifyAt(this.lastScheduledTime);
                        }
                    }
                } else if (streamEvent2.getType() == ComplexEvent.Type.CURRENT) {
                    long longValue = ((Long) this.timestampExpressionExecutor.execute(streamEvent2)).longValue();
                    if (this.lastCurrentEventTime < longValue) {
                        this.lastCurrentEventTime = longValue;
                    }
                    if (longValue < this.endTime) {
                        cloneAppend(streamEventCloner, streamEvent2);
                    } else {
                        if (this.flushed) {
                            appendToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, false);
                            this.flushed = false;
                        } else {
                            flushToOutputChunk(streamEventCloner, arrayList, this.lastCurrentEventTime, false);
                        }
                        this.endTime = findEndTime(this.lastCurrentEventTime, this.startTime, this.timeToKeep);
                        cloneAppend(streamEventCloner, streamEvent2);
                        if (this.schedulerTimeout > 0) {
                            this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                            this.scheduler.notifyAt(this.lastScheduledTime);
                        }
                    }
                }
            }
        }
        Iterator<ComplexEventChunk<StreamEvent>> it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process(it.next());
        }
    }

    private void initTiming(StreamEvent streamEvent) {
        if (this.endTime < 0) {
            if (this.isStartTimeEnabled) {
                this.endTime = findEndTime(((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue(), this.startTime, this.timeToKeep);
            } else {
                this.startTime = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
                this.endTime = this.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0) {
                this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                if (this.scheduler != null) {
                    this.scheduler.notifyAt(this.lastScheduledTime);
                }
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j, boolean z) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        if (this.outputExpectsExpiredEvents && this.expiredEvents.size() > 0) {
            for (StreamEvent streamEvent : this.expiredEvents.values()) {
                streamEvent.setTimestamp(j);
                complexEventChunk.add(streamEvent);
            }
        }
        if (this.expiredEvents != null) {
            this.expiredEvents.clear();
        }
        if (this.currentEvents.size() > 0) {
            this.resetEvent.setTimestamp(j);
            complexEventChunk.add(this.resetEvent);
            this.resetEvent = null;
            for (Map.Entry<Object, StreamEvent> entry : this.currentEvents.entrySet()) {
                if (z || this.storeExpiredEvents) {
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(entry.getValue());
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEvents.put(entry.getKey(), copyStreamEvent);
                }
                complexEventChunk.add(entry.getValue());
            }
        }
        this.currentEvents.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> list, long j, boolean z) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (this.currentEvents.size() > 0) {
            if (this.expiredEvents.size() > 0) {
                for (Map.Entry<Object, StreamEvent> entry : this.expiredEvents.entrySet()) {
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(entry.getValue());
                        copyStreamEvent.setTimestamp(j);
                        complexEventChunk.add(copyStreamEvent);
                    }
                    StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent(entry.getValue());
                    copyStreamEvent2.setType(ComplexEvent.Type.CURRENT);
                    linkedHashMap.put(entry.getKey(), copyStreamEvent2);
                }
            }
            StreamEvent copyStreamEvent3 = streamEventCloner.copyStreamEvent(this.resetEvent);
            copyStreamEvent3.setTimestamp(j);
            complexEventChunk.add(copyStreamEvent3);
            for (Map.Entry<Object, StreamEvent> entry2 : this.currentEvents.entrySet()) {
                if (z || this.storeExpiredEvents) {
                    StreamEvent copyStreamEvent4 = streamEventCloner.copyStreamEvent(entry2.getValue());
                    copyStreamEvent4.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEvents.put(entry2.getKey(), copyStreamEvent4);
                }
                linkedHashMap.put(entry2.getKey(), entry2.getValue());
            }
            Iterator it = linkedHashMap.values().iterator();
            while (it.hasNext()) {
                complexEventChunk.add((StreamEvent) it.next());
            }
        }
        this.currentEvents.clear();
        if (complexEventChunk.getFirst() != null) {
            list.add(complexEventChunk);
        }
    }

    private long findEndTime(long j, long j2, long j3) {
        return j + (j3 - ((j - j2) % j3));
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent streamEvent) {
        StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
        if (this.replaceTimestampWithBatchEndTime) {
            copyStreamEvent.setAttribute(Long.valueOf(this.endTime), this.timestampExpressionExecutor.getPosition());
        }
        this.currentEvents.put(this.uniqueExpressionExecutor.execute(copyStreamEvent), copyStreamEvent);
        if (this.resetEvent == null) {
            this.resetEvent = streamEventCloner.copyStreamEvent(streamEvent);
            this.resetEvent.setType(ComplexEvent.Type.RESET);
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public synchronized Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("currentEvents", this.currentEvents);
        hashMap.put("expiredEvents", this.expiredEvents);
        hashMap.put("resetEvent", this.resetEvent);
        hashMap.put("endTime", Long.valueOf(this.endTime));
        hashMap.put("startTime", Long.valueOf(this.startTime));
        hashMap.put("lastScheduledTime", Long.valueOf(this.lastScheduledTime));
        hashMap.put("lastCurrentEventTime", Long.valueOf(this.lastCurrentEventTime));
        hashMap.put("flushed", Boolean.valueOf(this.flushed));
        return hashMap;
    }

    public synchronized void restoreState(Map<String, Object> map) {
        this.currentEvents = (Map) map.get("currentEvents");
        if (map.get("expiredEvents") != null) {
            this.expiredEvents = (Map) map.get("expiredEvents");
        } else {
            if (this.outputExpectsExpiredEvents) {
                this.expiredEvents = new LinkedHashMap();
            }
            if (this.schedulerTimeout > 0) {
                this.expiredEvents = new LinkedHashMap();
            }
        }
        this.resetEvent = (StreamEvent) map.get("resetEvent");
        this.endTime = ((Long) map.get("endTime")).longValue();
        this.startTime = ((Long) map.get("startTime")).longValue();
        this.lastScheduledTime = ((Long) map.get("lastScheduledTime")).longValue();
        this.lastCurrentEventTime = ((Long) map.get("lastCurrentEventTime")).longValue();
        this.flushed = ((Boolean) map.get("flushed")).booleanValue();
    }

    public synchronized Scheduler getScheduler() {
        return this.scheduler;
    }

    public synchronized void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public synchronized StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (compiledCondition instanceof Operator) {
            return ((Operator) compiledCondition).find(stateEvent, this.expiredEvents, this.streamEventCloner);
        }
        return null;
    }

    public synchronized CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        if (this.expiredEvents == null) {
            this.expiredEvents = new LinkedHashMap();
            this.storeExpiredEvents = true;
        }
        return OperatorParser.constructOperator(this.expiredEvents, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }
}
