/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="lengthBatch", namespace="unique", description="This is a batch (tumbling) window that holds a specified number of latest unique events. The unique events are determined based on the value for a specified unique key parameter. The window is updated for every window length, i.e., for the last set of events of the specified number in a tumbling manner. When a new event arrives within the window length having the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true), @Parameter(name="window.length", description="The number of events the window should tumble.", type={DataType.INT}, dynamic=true)}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "window.length"})}, examples={@Example(syntax="define window CseEventWindow (symbol string, price float, volume int)\n\n from CseEventStream#window.unique:lengthBatch(symbol, 10)\nselect symbol, price, volume\ninsert expired events into OutputStream ;", description="In this query, the window at any give time holds the last 10 unique events from the 'CseEventStream' stream. Each of the 10 events within the window at a given time has a unique value for the symbol attribute. If a new event has the same value for the symbol attribute as an existing event within the window length, the existing event expires and it is replaced by the new event. The query returns expired individual events as well as expired batches of events to the 'OutputStream' stream.")})
public class UniqueLengthBatchWindowProcessor
extends WindowProcessor<WindowState>
implements FindableProcessor {
    private int windowLength;
    private SiddhiAppContext siddhiAppContext;
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private Map<Object, StreamEvent> uniqueEventMap = new HashMap<Object, StreamEvent>();
    private StreamEventCloner streamEventCloner;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (attributeExpressionExecutors.length != 2) throw new SiddhiAppValidationException("Unique Length batch window should only have two parameters, but found " + attributeExpressionExecutors.length + " input attributes");
        this.uniqueKeyExpressionExecutor = attributeExpressionExecutors[0];
        if (!(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Unique Length Batch window should have constant for Length parameter but found a dynamic attribute " + attributeExpressionExecutors[1].getClass().getCanonicalName());
        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.INT) {
            throw new SiddhiAppValidationException("Unique Length Batch window's Length parameter should be INT, but found " + attributeExpressionExecutors[1].getReturnType());
        }
        this.windowLength = (Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        this.streamEventCloner = streamEventCloner;
        ArrayList<ComplexEventChunk> streamEventChunks = new ArrayList<ComplexEventChunk>();
        UniqueLengthBatchWindowProcessor uniqueLengthBatchWindowProcessor = this;
        synchronized (uniqueLengthBatchWindowProcessor) {
            ComplexEventChunk outputStreamEventChunk = new ComplexEventChunk(true);
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
                this.addUniqueEvent(this.uniqueEventMap, this.uniqueKeyExpressionExecutor, clonedStreamEvent);
                if (this.uniqueEventMap.size() != this.windowLength) continue;
                for (StreamEvent event : this.uniqueEventMap.values()) {
                    event.setTimestamp(currentTime);
                    state.currentEventChunk.add((ComplexEvent)event);
                }
                this.uniqueEventMap.clear();
                if (state.eventsToBeExpired.getFirst() != null) {
                    while (state.eventsToBeExpired.hasNext()) {
                        StreamEvent expiredEvent = (StreamEvent)state.eventsToBeExpired.next();
                        expiredEvent.setTimestamp(currentTime);
                    }
                    outputStreamEventChunk.add(state.eventsToBeExpired.getFirst());
                }
                state.eventsToBeExpired.clear();
                if (state.currentEventChunk.getFirst() != null) {
                    outputStreamEventChunk.add((ComplexEvent)state.resetEvent);
                    state.currentEventChunk.reset();
                    while (state.currentEventChunk.hasNext()) {
                        StreamEvent toExpireEvent = (StreamEvent)state.currentEventChunk.next();
                        StreamEvent eventClonedForMap = streamEventCloner.copyStreamEvent(toExpireEvent);
                        eventClonedForMap.setType(ComplexEvent.Type.EXPIRED);
                        state.eventsToBeExpired.add((ComplexEvent)eventClonedForMap);
                    }
                    state.resetEvent = streamEventCloner.copyStreamEvent((StreamEvent)state.currentEventChunk.getFirst());
                    state.resetEvent.setType(ComplexEvent.Type.RESET);
                    outputStreamEventChunk.add(state.currentEventChunk.getFirst());
                }
                state.currentEventChunk.clear();
                if (outputStreamEventChunk.getFirst() == null) continue;
                streamEventChunks.add(outputStreamEventChunk);
            }
        }
        for (ComplexEventChunk outputStreamEventChunk : streamEventChunks) {
            nextProcessor.process(outputStreamEventChunk);
        }
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    protected void addUniqueEvent(Map<Object, StreamEvent> uniqueEventMap, ExpressionExecutor uniqueKey, StreamEvent clonedStreamEvent) {
        uniqueEventMap.put(uniqueKey.execute((ComplexEvent)clonedStreamEvent), clonedStreamEvent);
    }

    public void start() {
    }

    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, this.uniqueEventMap.values(), this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        return OperatorParser.constructOperator(this.uniqueEventMap.values(), (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, tableMap, (SiddhiQueryContext)siddhiQueryContext);
    }

    class WindowState
    extends State {
        private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk(false);
        private ComplexEventChunk<StreamEvent> eventsToBeExpired = null;
        private int count = 0;
        private StreamEvent resetEvent = null;

        WindowState(ComplexEventChunk<StreamEvent> eventsToBeExpired) {
            this.eventsToBeExpired = eventsToBeExpired;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            if (this.eventsToBeExpired != null) {
                HashMap<String, Object> map = new HashMap<String, Object>();
                map.put("currentEventChunk", this.currentEventChunk.getFirst());
                map.put("eventsToBeExpired", this.eventsToBeExpired.getFirst());
                map.put("count", this.count);
                map.put("resetEvent", this.resetEvent);
                return map;
            }
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("currentEventChunk", this.currentEventChunk.getFirst());
            map.put("count", this.count);
            map.put("resetEvent", this.resetEvent);
            return map;
        }

        public void restore(Map<String, Object> state) {
            if (state.size() > 3) {
                this.currentEventChunk.clear();
                this.currentEventChunk.add((ComplexEvent)((StreamEvent)state.get("currentEventChunk")));
                this.eventsToBeExpired.clear();
                this.eventsToBeExpired.add((ComplexEvent)((StreamEvent)state.get("eventsToBeExpired")));
                this.count = (Integer)state.get("count");
                this.resetEvent = (StreamEvent)state.get("resetEvent");
            } else {
                this.currentEventChunk.clear();
                this.currentEventChunk.add((ComplexEvent)((StreamEvent)state.get("currentEventChunk")));
                this.count = (Integer)state.get("count");
                this.resetEvent = (StreamEvent)state.get("resetEvent");
            }
        }
    }
}

