/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Extension(name="length", namespace="unique", description="This is a sliding length window that holds the events of the latest window length with the unique key and gets updated for the expiry and arrival of each event. When a new event arrives with the key that is already there in the window, then the previous event expires and new event is kept within the window.", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true), @Parameter(name="window.length", description="The number of events that should be included in a sliding length window.", type={DataType.INT}, dynamic=true)}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "window.length"})}, examples={@Example(syntax="define stream CseEventStream (symbol string, price float, volume int)\n\nfrom CseEventStream#window.unique:length(symbol,10)\nselect symbol, price, volume\ninsert all events into OutputStream;", description="In this configuration, the window holds the latest 10 unique events. The latest events are selected based on the symbol attribute. If the 'CseEventStream' receives an event for which the value for the symbol attribute is the same as that of an existing event in the window, the existing event is replaced by the new event. All the events are returned to the 'OutputStream' event stream once an event expires or is added to the window.")})
public class UniqueLengthWindowProcessor
extends WindowProcessor<WindowState>
implements FindableProcessor {
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private int length;
    private SiddhiAppContext siddhiAppContext;
    private StreamEventCloner streamEventCloner;

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (attributeExpressionExecutors.length != 2) {
            throw new SiddhiAppValidationException("Unique Length window should only have two parameters (<string|int|bool|long|double|float> attribute, <int> windowLength), but found " + attributeExpressionExecutors.length + " input attributes");
        }
        this.uniqueKeyExpressionExecutor = attributeExpressionExecutors[0];
        this.length = (Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        this.streamEventCloner = streamEventCloner;
        WindowState windowState = state;
        synchronized (windowState) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                streamEvent.setNext(null);
                StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
                clonedEvent.setType(ComplexEvent.Type.EXPIRED);
                StreamEvent eventClonedForMap = streamEventCloner.copyStreamEvent(clonedEvent);
                StreamEvent oldEvent = state.map.put(this.generateKey(eventClonedForMap), eventClonedForMap);
                if (oldEvent == null) {
                    state.count++;
                }
                if (state.count <= this.length && oldEvent == null) {
                    state.expiredEventChunk.add((ComplexEvent)clonedEvent);
                    continue;
                }
                if (oldEvent != null) {
                    while (state.expiredEventChunk.hasNext()) {
                        StreamEvent firstEventExpired = (StreamEvent)state.expiredEventChunk.next();
                        if (!firstEventExpired.equals((Object)oldEvent)) continue;
                        state.expiredEventChunk.remove();
                    }
                    state.expiredEventChunk.add((ComplexEvent)clonedEvent);
                    streamEventChunk.insertBeforeCurrent((ComplexEvent)oldEvent);
                    oldEvent.setTimestamp(currentTime);
                    continue;
                }
                StreamEvent firstEvent = (StreamEvent)state.expiredEventChunk.poll();
                if (firstEvent != null) {
                    firstEvent.setTimestamp(currentTime);
                    streamEventChunk.insertBeforeCurrent((ComplexEvent)firstEvent);
                    state.expiredEventChunk.add((ComplexEvent)clonedEvent);
                    continue;
                }
                streamEventChunk.insertBeforeCurrent((ComplexEvent)clonedEvent);
            }
        }
        nextProcessor.process(streamEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, (Object)state.expiredEventChunk, this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    private String generateKey(StreamEvent event) {
        return this.uniqueKeyExpressionExecutor.execute((ComplexEvent)event).toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        Operator compiledCondition;
        WindowState state = (WindowState)this.stateHolder.getState();
        try {
            compiledCondition = OperatorParser.constructOperator((Object)state.expiredEventChunk, (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, tableMap, (SiddhiQueryContext)siddhiQueryContext);
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return compiledCondition;
    }

    class WindowState
    extends State {
        private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap();
        private ComplexEventChunk<StreamEvent> expiredEventChunk;
        private int count = 0;

        WindowState(ComplexEventChunk<StreamEvent> expiredEventChunk) {
            this.expiredEventChunk = expiredEventChunk;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("expiredEventChunk", this.expiredEventChunk.getFirst());
            map.put("count", this.count);
            map.put("map", this.map);
            return map;
        }

        public void restore(Map<String, Object> state) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((ComplexEvent)((StreamEvent)state.get("expiredEventChunk")));
            this.count = (Integer)state.get("count");
            this.map = (ConcurrentHashMap)state.get("map");
        }
    }
}

