/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.expression.Expression;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Extension(name="first", namespace="unique", description="This is a window that holds only the first set of unique events according to the unique key parameter. When a new event arrives with a key that is already in the window, that event is not processed by the window.", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness. If there is more than one parameter to check for uniqueness, it can be specified as an array separated by commas.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true)}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key"}), @ParameterOverload(parameterNames={"unique.key", "..."})}, examples={@Example(syntax="define stream LoginEvents (timeStamp long, ip string);\n\nfrom LoginEvents#window.unique:first(ip)\ninsert into UniqueIps ;", description="This returns the first set of unique items that arrive from the 'LoginEvents' stream, and returns them to the 'UniqueIps' stream. The unique events are only those with a unique value for the 'ip' attribute.")})
public class UniqueFirstWindowProcessor
extends WindowProcessor<WindowState>
implements FindableProcessor {
    private ExpressionExecutor[] uniqueExpressionExecutors;
    private StreamEventCloner streamEventCloner;

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.uniqueExpressionExecutors = attributeExpressionExecutors;
        return () -> new WindowState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        this.streamEventCloner = streamEventCloner;
        WindowState windowState = state;
        synchronized (windowState) {
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
                clonedEvent.setType(ComplexEvent.Type.EXPIRED);
                ComplexEvent oldEvent = (ComplexEvent)state.map.putIfAbsent(this.generateKey(clonedEvent), clonedEvent);
                if (oldEvent == null) continue;
                streamEventChunk.remove();
            }
        }
        nextProcessor.process(streamEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (ExpressionExecutor executor : this.uniqueExpressionExecutors) {
            stringBuilder.append(executor.execute((ComplexEvent)event));
        }
        return stringBuilder.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, state.map.values(), this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> eventTableMap, SiddhiQueryContext siddhiQueryContext) {
        Operator compiledCondition;
        WindowState state = (WindowState)this.stateHolder.getState();
        try {
            compiledCondition = OperatorParser.constructOperator(state.map.values(), (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, eventTableMap, (SiddhiQueryContext)siddhiQueryContext);
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return compiledCondition;
    }

    class WindowState
    extends State {
        private ConcurrentMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>();

        WindowState() {
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            return Collections.singletonMap("map", this.map);
        }

        public void restore(Map<String, Object> state) {
            this.map = (ConcurrentMap)state.get("map");
        }
    }
}

