/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="timeLengthBatch", namespace="unique", description="This is a batch or tumbling time length window that is updated with the latest events based on a unique key parameter. The window tumbles upon the elapse of the time window, or when a number of unique events have arrived. If a new event that arrives within the period of the window has a value for the key parameter which matches the value of an existing event, the existing event expires and it is replaced by the new event. ", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true), @Parameter(name="window.time", description="The sliding time period for which the window should hold the events.", type={DataType.INT, DataType.LONG}, dynamic=true), @Parameter(name="start.time", description="This specifies an offset in milliseconds in order to start the window at a time different to the standard time.", type={DataType.INT, DataType.LONG}, dynamic=true, optional=true, defaultValue="Timestamp of first event"), @Parameter(name="window.length", description="The number of events the window should tumble.", type={DataType.INT}, dynamic=true)}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "window.time", "window.length"}), @ParameterOverload(parameterNames={"unique.key", "window.time", "start.time", "window.length"})}, examples={@Example(syntax="define stream CseEventStream (symbol string, price float, volume int)\n\nfrom CseEventStream#window.unique:timeLengthBatch(symbol, 1 sec, 20)\nselect symbol, price, volume\ninsert all events into OutputStream;", description="This window holds the latest unique events that arrive from the 'CseEventStream' at a given time, and returns all the events to the 'OutputStream' stream. It is updated every second based on the latest values for the 'symbol' attribute.")})
public class UniqueTimeLengthBatchWindowProcessor
extends WindowProcessor<WindowState>
implements SchedulingProcessor,
FindableProcessor {
    private long timeInMilliSeconds;
    private long length;
    private long nextEmitTime = -1L;
    private Map<Object, StreamEvent> uniqueEventMap = new HashMap<Object, StreamEvent>();
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private boolean isStartTimeEnabled = false;
    private long startTime = 0L;
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private boolean eventSent = false;
    private StreamEventCloner streamEventCloner;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (attributeExpressionExecutors.length == 3) {
            this.uniqueKeyExpressionExecutor = attributeExpressionExecutors[0];
            if (!(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Time Length Batch window should have constant for time parameter but found a dynamic attribute " + attributeExpressionExecutors[1].getClass().getCanonicalName());
            if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
                this.timeInMilliSeconds = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue()).intValue();
            } else {
                if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Unique Time Length Batch window's parameter time should be either int, long or time, but found " + attributeExpressionExecutors[1].getReturnType());
                this.timeInMilliSeconds = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
            }
            if (!(attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Time Length Batch window should have constant for length parameter but found a dynamic attribute " + attributeExpressionExecutors[2].getClass().getCanonicalName());
            if (attributeExpressionExecutors[2].getReturnType() == Attribute.Type.INT) {
                this.length = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()).intValue();
                return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
            } else {
                if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Unique Time Length Batch window's parameter length should be eitherint or long, but found " + attributeExpressionExecutors[2].getReturnType());
                this.length = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue();
            }
            return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
        } else {
            if (attributeExpressionExecutors.length != 4) throw new SiddhiAppValidationException("Unique Time Length Batch window should only have three or four parameters. but found " + attributeExpressionExecutors.length + " input attributes");
            this.uniqueKeyExpressionExecutor = attributeExpressionExecutors[0];
            if (!(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Time Length Batch window should have constant for time parameter but found a dynamic attribute " + attributeExpressionExecutors[1].getClass().getCanonicalName());
            if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
                this.timeInMilliSeconds = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue()).intValue();
            } else {
                if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("UniqueTimeLengthBatch window's parameter time should be either int or long, but found " + attributeExpressionExecutors[1].getReturnType());
                this.timeInMilliSeconds = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
            }
            if (!(attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Time Length Batch window should have constant for time parameter but found a dynamic attribute " + attributeExpressionExecutors[2].getReturnType());
            if (attributeExpressionExecutors[2].getReturnType() == Attribute.Type.INT) {
                this.isStartTimeEnabled = true;
                this.startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
            } else {
                if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Expected either int or long type for UniqueTimeLengthBatch window's start time parameter, but found " + attributeExpressionExecutors[2].getReturnType());
                this.isStartTimeEnabled = true;
                this.startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
            }
            if (!(attributeExpressionExecutors[3] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Time Length Batch window should have constant for length parameter but found a dynamic attribute " + attributeExpressionExecutors[3].getClass().getCanonicalName());
            if (attributeExpressionExecutors[3].getReturnType() == Attribute.Type.INT) {
                this.length = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()).intValue();
                return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
            } else {
                if (attributeExpressionExecutors[3].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Unique Time Length Batch window's parameter length should be eitherint or long, but found " + attributeExpressionExecutors[3].getReturnType());
                this.length = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue();
            }
        }
        return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        this.streamEventCloner = streamEventCloner;
        WindowState windowState = state;
        synchronized (windowState) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            if (this.nextEmitTime == -1L) {
                this.nextEmitTime = this.isStartTimeEnabled ? this.getNextEmitTime(currentTime) : currentTime + this.timeInMilliSeconds;
                if (this.scheduler != null) {
                    this.scheduler.notifyAt(this.nextEmitTime);
                }
            }
            boolean sendEventsByTime = false;
            boolean sendEventsByLength = false;
            if (currentTime >= this.nextEmitTime) {
                this.nextEmitTime += this.timeInMilliSeconds;
                if (this.scheduler != null) {
                    this.scheduler.notifyAt(this.nextEmitTime);
                }
                if (this.eventSent) {
                    this.eventSent = false;
                    streamEventChunk.clear();
                    return;
                }
                sendEventsByTime = true;
            }
            if (this.eventSent) {
                streamEventChunk.clear();
                return;
            }
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
                this.addUniqueEvent(this.uniqueEventMap, this.uniqueKeyExpressionExecutor, clonedStreamEvent);
                if ((long)this.uniqueEventMap.size() != this.length) continue;
                sendEventsByLength = true;
                break;
            }
            streamEventChunk.clear();
            if (sendEventsByTime || sendEventsByLength) {
                this.sendEvents(streamEventChunk, streamEventCloner, currentTime, state);
            }
            if (sendEventsByLength) {
                this.eventSent = true;
            }
        }
        if (streamEventChunk.getFirst() != null) {
            streamEventChunk.setBatch(true);
            nextProcessor.process(streamEventChunk);
            streamEventChunk.setBatch(false);
        }
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    private void sendEvents(ComplexEventChunk<StreamEvent> streamEventChunk, StreamEventCloner streamEventCloner, long currentTime, WindowState state) {
        for (StreamEvent event : this.uniqueEventMap.values()) {
            event.setTimestamp(currentTime);
            state.currentEventChunk.add((ComplexEvent)event);
        }
        this.uniqueEventMap.clear();
        if (state.eventsToBeExpired.getFirst() != null) {
            while (state.eventsToBeExpired.hasNext()) {
                StreamEvent expiredEvent = (StreamEvent)state.eventsToBeExpired.next();
                expiredEvent.setTimestamp(currentTime);
            }
            streamEventChunk.add(state.eventsToBeExpired.getFirst());
        }
        state.eventsToBeExpired.clear();
        if (state.currentEventChunk.getFirst() != null) {
            streamEventChunk.add((ComplexEvent)state.resetEvent);
            state.currentEventChunk.reset();
            while (state.currentEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)state.currentEventChunk.next();
                StreamEvent eventClonedForMap = streamEventCloner.copyStreamEvent(streamEvent);
                eventClonedForMap.setType(ComplexEvent.Type.EXPIRED);
                state.eventsToBeExpired.add((ComplexEvent)eventClonedForMap);
            }
            if (state.currentEventChunk.getFirst() != null) {
                state.resetEvent = streamEventCloner.copyStreamEvent((StreamEvent)state.currentEventChunk.getFirst());
                state.resetEvent.setType(ComplexEvent.Type.RESET);
                streamEventChunk.add(state.currentEventChunk.getFirst());
            }
        }
        state.currentEventChunk.clear();
    }

    public synchronized void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public synchronized Scheduler getScheduler() {
        return this.scheduler;
    }

    protected void addUniqueEvent(Map<Object, StreamEvent> uniqueEventMap, ExpressionExecutor uniqueKeyExpressionExecutor, StreamEvent clonedStreamEvent) {
        uniqueEventMap.put(uniqueKeyExpressionExecutor.execute((ComplexEvent)clonedStreamEvent), clonedStreamEvent);
    }

    private long getNextEmitTime(long currentTime) {
        long elapsedTimeSinceLastEmit = (currentTime - this.startTime) % this.timeInMilliSeconds;
        return currentTime + (this.timeInMilliSeconds - elapsedTimeSinceLastEmit);
    }

    public void start() {
    }

    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, (Object)state.eventsToBeExpired, this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        Operator compiledCondition;
        WindowState state = (WindowState)this.stateHolder.getState();
        try {
            compiledCondition = OperatorParser.constructOperator((Object)state.eventsToBeExpired, (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, tableMap, (SiddhiQueryContext)siddhiQueryContext);
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return compiledCondition;
    }

    class WindowState
    extends State {
        private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk(false);
        private ComplexEventChunk<StreamEvent> eventsToBeExpired;
        private StreamEvent resetEvent = null;

        WindowState(ComplexEventChunk<StreamEvent> eventsToBeExpired) {
            this.eventsToBeExpired = eventsToBeExpired;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            if (this.eventsToBeExpired != null) {
                HashMap<String, Object> map = new HashMap<String, Object>();
                map.put("currentEventChunk", this.currentEventChunk.getFirst());
                map.put("eventsToBeExpired", this.eventsToBeExpired.getFirst());
                map.put("resetEvent", this.resetEvent);
                return map;
            }
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("currentEventChunk", this.currentEventChunk.getFirst());
            map.put("resetEvent", this.resetEvent);
            return map;
        }

        public void restore(Map<String, Object> state) {
            if (state.size() > 2) {
                this.currentEventChunk.clear();
                this.currentEventChunk.add((ComplexEvent)((StreamEvent)state.get("currentEventChunk")));
                this.eventsToBeExpired.clear();
                this.eventsToBeExpired.add((ComplexEvent)((StreamEvent)state.get("eventsToBeExpired")));
                this.resetEvent = (StreamEvent)state.get("resetEvent");
            } else {
                this.currentEventChunk.clear();
                this.currentEventChunk.add((ComplexEvent)((StreamEvent)state.get("currentEventChunk")));
                this.resetEvent = (StreamEvent)state.get("resetEvent");
            }
        }
    }
}

