/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Extension(name="externalTimeBatch", namespace="unique", description="This is a batch (tumbling) time window that is determined based on an external time, i.e., time stamps that are specified via an attribute in the events. It holds the latest unique events that arrived during the last window time period. The unique events are determined based on the value for a specified unique key parameter. When a new event arrives within the time window with a value for the unique key parameter that is the same as that of an existing event in the window, the existing event expires and it is replaced by the new event.", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true), @Parameter(name="time.stamp", description=" The time which the window determines as the current time and acts upon. The value of this parameter should be monotonically increasing.", type={DataType.LONG}), @Parameter(name="window.time", description="The sliding time period for which the window should hold events.", type={DataType.INT, DataType.LONG}), @Parameter(name="start.time", description="This specifies an offset in milliseconds in order to start the window at a time different to the standard time.", defaultValue="Timestamp of first event", type={DataType.INT}, optional=true), @Parameter(name="time.out", description="Time to wait for arrival of a new event, before flushing and returning the output for events belonging to a specific batch.", type={DataType.INT, DataType.LONG}, optional=true, defaultValue="The system waits till an event from the next batch arrives to flush the current batch"), @Parameter(name="replace.time.stamp.with.batch.end.time", description="Replaces the 'timestamp' value with the corresponding batch end time stamp.", type={DataType.INT, DataType.LONG}, optional=true, defaultValue="false")}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "time.stamp", "window.time"}), @ParameterOverload(parameterNames={"unique.key", "time.stamp", "window.time", "start.time"}), @ParameterOverload(parameterNames={"unique.key", "time.stamp", "window.time", "start.time", "time.out"}), @ParameterOverload(parameterNames={"unique.key", "time.stamp", "window.time", "start.time", "time.out", "replace.time.stamp.with.batch.end.time"})}, examples={@Example(syntax="define stream LoginEvents (timestamp long, ip string);\n\nfrom LoginEvents#window.unique:externalTimeBatch(ip, timestamp, 1 sec, 0, 2 sec) \nselect timestamp, ip, count() as total\ninsert into UniqueIps ;", description="In this query, the window holds the latest unique events that arrive from the 'LoginEvent' stream during each second. The latest events are determined based on the external time stamp. At a given time, all the events held in the window have unique values for the 'ip' and monotonically increasing values for 'timestamp' attributes. The events in the window are inserted into the 'UniqueIps' output stream. The system waits for 2 seconds for the arrival of a new event before flushing the current batch.")})
public class UniqueExternalTimeBatchWindowProcessor
extends WindowProcessor<WindowState>
implements SchedulingProcessor,
FindableProcessor {
    private VariableExpressionExecutor timestampExpressionExecutor;
    private long timeToKeep;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = 0L;
    private Scheduler scheduler;
    private boolean storeExpiredEvents = false;
    private ExpressionExecutor uniqueExpressionExecutor;
    private boolean replaceTimestampWithBatchEndTime = false;
    private boolean outputExpectsExpiredEvents;
    private SiddhiAppContext siddhiAppContext;
    private StreamEventCloner streamEventCloner;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        LinkedHashMap expiredEvents = null;
        long startTime = 0L;
        if (outputExpectsExpiredEvents) {
            expiredEvents = new LinkedHashMap();
            this.storeExpiredEvents = true;
        }
        this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
        if (attributeExpressionExecutors.length < 3 || attributeExpressionExecutors.length > 6) throw new SiddhiAppValidationException("ExternalTimeBatch window should only have three to six parameters (<variable> uniqueAttribute, <long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout, <bool> replaceTimestampWithBatchEndTime), but found " + attributeExpressionExecutors.length + " input attributes");
        this.uniqueExpressionExecutor = attributeExpressionExecutors[0];
        if (!(attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 2nd parameter timestamp should be a variable, but found " + attributeExpressionExecutors[1].getClass());
        }
        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 2nd parameter timestamp should be type long, but found " + attributeExpressionExecutors[1].getReturnType());
        }
        this.timestampExpressionExecutor = (VariableExpressionExecutor)attributeExpressionExecutors[1];
        if (attributeExpressionExecutors[2].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()).intValue();
        } else {
            if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter windowTime should be either int or long, but found " + attributeExpressionExecutors[2].getReturnType());
            this.timeToKeep = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue();
        }
        if (attributeExpressionExecutors.length >= 4) {
            this.isStartTimeEnabled = true;
            if (attributeExpressionExecutors[3].getReturnType() == Attribute.Type.INT) {
                startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            } else {
                if (attributeExpressionExecutors[3].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 4th parameter startTime should be either int or long, but found " + attributeExpressionExecutors[3].getReturnType());
                startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            }
        }
        if (attributeExpressionExecutors.length >= 5) {
            if (attributeExpressionExecutors[4].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[4]).getValue()));
            } else {
                if (attributeExpressionExecutors[4].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 5th parameter timeout should be either int or long, but found " + attributeExpressionExecutors[4].getReturnType());
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[4]).getValue()));
            }
        }
        if (attributeExpressionExecutors.length == 6) {
            if (attributeExpressionExecutors[5].getReturnType() != Attribute.Type.BOOL) throw new SiddhiAppValidationException("ExternalTimeBatch window's 6th parameter replaceTimestampWithBatchEndTime should be bool, but found " + attributeExpressionExecutors[5].getReturnType());
            this.replaceTimestampWithBatchEndTime = Boolean.parseBoolean(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[5]).getValue()));
        }
        if (this.schedulerTimeout > 0L && expiredEvents == null) {
            expiredEvents = new LinkedHashMap();
        }
        LinkedHashMap expiredEventsFinal = expiredEvents;
        long startTimeFinal = startTime;
        return () -> new WindowState(expiredEventsFinal, startTimeFinal);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        if (streamEventChunk.getFirst() == null) {
            return;
        }
        this.streamEventCloner = streamEventCloner;
        ArrayList<ComplexEventChunk<StreamEvent>> complexEventChunks = new ArrayList<ComplexEventChunk<StreamEvent>>();
        WindowState windowState = state;
        synchronized (windowState) {
            void var8_9;
            this.initTiming((StreamEvent)streamEventChunk.getFirst(), state);
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.getFirst();
            while (var8_9 != null) {
                void currStreamEvent = var8_9;
                StreamEvent streamEvent2 = var8_9.getNext();
                if (currStreamEvent.getType() == ComplexEvent.Type.TIMER) {
                    if (state.lastScheduledTime > currStreamEvent.getTimestamp()) continue;
                    if (!state.flushed) {
                        this.flushToOutputChunk(streamEventCloner, complexEventChunks, true, state);
                        state.flushed = true;
                    } else if (state.currentEvents.size() > 0) {
                        this.appendToOutputChunk(streamEventCloner, complexEventChunks, true, state);
                    }
                    state.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                    if (this.scheduler == null) continue;
                    this.scheduler.notifyAt(state.lastScheduledTime);
                    continue;
                }
                if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                long currentEventTime = (Long)this.timestampExpressionExecutor.execute((ComplexEvent)currStreamEvent);
                if (state.lastCurrentEventTime < currentEventTime) {
                    state.lastCurrentEventTime = currentEventTime;
                }
                if (currentEventTime < state.endTime) {
                    this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent, state);
                    continue;
                }
                if (state.flushed) {
                    this.appendToOutputChunk(streamEventCloner, complexEventChunks, false, state);
                    state.flushed = false;
                } else {
                    this.flushToOutputChunk(streamEventCloner, complexEventChunks, false, state);
                }
                state.endTime = this.findEndTime(state.lastCurrentEventTime, state.startTime, this.timeToKeep);
                this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent, state);
                if (this.schedulerTimeout <= 0L) continue;
                state.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(state.lastScheduledTime);
            }
        }
        for (ComplexEventChunk complexEventChunk : complexEventChunks) {
            nextProcessor.process(complexEventChunk);
        }
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    private void initTiming(StreamEvent firstStreamEvent, WindowState state) {
        if (state.endTime < 0L) {
            if (this.isStartTimeEnabled) {
                state.endTime = this.findEndTime((Long)this.timestampExpressionExecutor.execute((ComplexEvent)firstStreamEvent), state.startTime, this.timeToKeep);
            } else {
                state.startTime = (Long)this.timestampExpressionExecutor.execute((ComplexEvent)firstStreamEvent);
                state.endTime = state.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0L) {
                state.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                if (this.scheduler != null) {
                    this.scheduler.notifyAt(state.lastScheduledTime);
                }
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, boolean preserveCurrentEvents, WindowState state) {
        ComplexEventChunk newEventChunk = new ComplexEventChunk(true);
        if (this.outputExpectsExpiredEvents && state.expiredEvents.size() > 0) {
            for (StreamEvent streamEvent : state.expiredEvents.values()) {
                streamEvent.setTimestamp(state.lastCurrentEventTime);
                newEventChunk.add((ComplexEvent)streamEvent);
            }
        }
        if (state.expiredEvents != null) {
            state.expiredEvents.clear();
        }
        if (state.currentEvents.size() > 0) {
            state.resetEvent.setTimestamp(state.lastCurrentEventTime);
            newEventChunk.add((ComplexEvent)state.resetEvent);
            state.resetEvent = null;
            for (Map.Entry entry : state.currentEvents.entrySet()) {
                if (preserveCurrentEvents || this.storeExpiredEvents) {
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent((StreamEvent)entry.getValue());
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    state.expiredEvents.put(entry.getKey(), toExpireEvent);
                }
                newEventChunk.add((ComplexEvent)entry.getValue());
            }
        }
        state.currentEvents.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add((ComplexEventChunk<StreamEvent>)newEventChunk);
        }
    }

    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, boolean preserveCurrentEvents, WindowState state) {
        ComplexEventChunk newEventChunk = new ComplexEventChunk(true);
        LinkedHashMap sentEvents = new LinkedHashMap();
        if (state.currentEvents.size() > 0) {
            if (state.expiredEvents.size() > 0) {
                for (Map.Entry expiredEventEntry : state.expiredEvents.entrySet()) {
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent((StreamEvent)expiredEventEntry.getValue());
                        toExpireEvent.setTimestamp(state.lastCurrentEventTime);
                        newEventChunk.add((ComplexEvent)toExpireEvent);
                    }
                    StreamEvent toSendEvent = streamEventCloner.copyStreamEvent((StreamEvent)expiredEventEntry.getValue());
                    toSendEvent.setType(ComplexEvent.Type.CURRENT);
                    sentEvents.put(expiredEventEntry.getKey(), toSendEvent);
                }
            }
            StreamEvent toResetEvent = streamEventCloner.copyStreamEvent(state.resetEvent);
            toResetEvent.setTimestamp(state.lastCurrentEventTime);
            newEventChunk.add((ComplexEvent)toResetEvent);
            for (Map.Entry currentEventEntry : state.currentEvents.entrySet()) {
                if (preserveCurrentEvents || this.storeExpiredEvents) {
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent((StreamEvent)currentEventEntry.getValue());
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    state.expiredEvents.put(currentEventEntry.getKey(), toExpireEvent);
                }
                sentEvents.put(currentEventEntry.getKey(), currentEventEntry.getValue());
            }
            for (StreamEvent sentEventEntry : sentEvents.values()) {
                newEventChunk.add((ComplexEvent)sentEventEntry);
            }
        }
        state.currentEvents.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add((ComplexEventChunk<StreamEvent>)newEventChunk);
        }
    }

    private long findEndTime(long currentTime, long startTime, long timeToKeep) {
        long elapsedTimeSinceLastEmit = (currentTime - startTime) % timeToKeep;
        return currentTime + (timeToKeep - elapsedTimeSinceLastEmit);
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent, WindowState state) {
        StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
        if (this.replaceTimestampWithBatchEndTime) {
            clonedStreamEvent.setAttribute((Object)state.endTime, this.timestampExpressionExecutor.getPosition());
        }
        state.currentEvents.put(this.uniqueExpressionExecutor.execute((ComplexEvent)clonedStreamEvent), clonedStreamEvent);
        if (state.resetEvent == null) {
            state.resetEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
            state.resetEvent.setType(ComplexEvent.Type.RESET);
        }
    }

    public void start() {
    }

    public void stop() {
    }

    public synchronized Scheduler getScheduler() {
        return this.scheduler;
    }

    public synchronized void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, (Object)state.expiredEvents, this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        Operator compiledCondition;
        WindowState state = (WindowState)this.stateHolder.getState();
        try {
            if (state.expiredEvents == null) {
                state.expiredEvents = new LinkedHashMap();
                this.storeExpiredEvents = true;
            }
            compiledCondition = OperatorParser.constructOperator((Object)state.expiredEvents, (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, tableMap, (SiddhiQueryContext)siddhiQueryContext);
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return compiledCondition;
    }

    class WindowState
    extends State {
        private Map<Object, StreamEvent> currentEvents = new LinkedHashMap<Object, StreamEvent>();
        private Map<Object, StreamEvent> expiredEvents = null;
        private volatile StreamEvent resetEvent = null;
        private long endTime = -1L;
        private long startTime = 0L;
        private long lastScheduledTime;
        private long lastCurrentEventTime;
        private boolean flushed = false;

        public WindowState(Map<Object, StreamEvent> expiredEvents, long startTime) {
            this.expiredEvents = expiredEvents;
            this.startTime = startTime;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("currentEvents", this.currentEvents);
            map.put("expiredEvents", this.expiredEvents);
            map.put("resetEvent", this.resetEvent);
            map.put("endTime", this.endTime);
            map.put("startTime", this.startTime);
            map.put("lastScheduledTime", this.lastScheduledTime);
            map.put("lastCurrentEventTime", this.lastCurrentEventTime);
            map.put("flushed", this.flushed);
            return map;
        }

        public void restore(Map<String, Object> state) {
            this.currentEvents = (Map)state.get("currentEvents");
            if (state.get("expiredEvents") != null) {
                this.expiredEvents = (Map)state.get("expiredEvents");
            } else if (UniqueExternalTimeBatchWindowProcessor.this.outputExpectsExpiredEvents || UniqueExternalTimeBatchWindowProcessor.this.schedulerTimeout > 0L) {
                this.expiredEvents = new LinkedHashMap<Object, StreamEvent>();
            }
            this.resetEvent = (StreamEvent)state.get("resetEvent");
            this.endTime = (Long)state.get("endTime");
            this.startTime = (Long)state.get("startTime");
            this.lastScheduledTime = (Long)state.get("lastScheduledTime");
            this.lastCurrentEventTime = (Long)state.get("lastCurrentEventTime");
            this.flushed = (Boolean)state.get("flushed");
        }
    }
}

