package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Extension(name = "length", namespace = "unique", description = "This is a sliding length window that holds the events of the latest window length with the unique key and gets updated for the expiry and arrival of each event. When a new event arrives with the key that is already there in the window, then the previous event expires and new event is kept within the window.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic = true), @Parameter(name = "window.length", description = "The number of events that should be included in a sliding length window.", type = {DataType.INT})}, parameterOverloads = {@ParameterOverload(parameterNames = {"unique.key", "window.length"})}, examples = {@Example(syntax = "define stream CseEventStream (symbol string, price float, volume int)\n\nfrom CseEventStream#window.unique:length(symbol,10)\nselect symbol, price, volume\ninsert all events into OutputStream;", description = "In this configuration, the window holds the latest 10 unique events. The latest events are selected based on the symbol attribute. If the 'CseEventStream' receives an event for which the value for the symbol attribute is the same as that of an existing event in the window, the existing event is replaced by the new event. All the events are returned to the 'OutputStream' event stream once an event expires or is added to the window.")})
/* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueLengthWindowProcessor.class */
public class UniqueLengthWindowProcessor extends WindowProcessor<WindowState> implements FindableProcessor {
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private int length;
    private SiddhiAppContext siddhiAppContext;
    private StreamEventCloner streamEventCloner;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueLengthWindowProcessor$WindowState.class */
    public class WindowState extends State {
        private ComplexEventChunk<StreamEvent> expiredEventChunk;
        private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<>();
        private int count = 0;

        WindowState(ComplexEventChunk<StreamEvent> complexEventChunk) {
            this.expiredEventChunk = complexEventChunk;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("expiredEventChunk", this.expiredEventChunk.getFirst());
            hashMap.put("count", Integer.valueOf(this.count));
            hashMap.put("map", this.map);
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((StreamEvent) map.get("expiredEventChunk"));
            this.count = ((Integer) map.get("count")).intValue();
            this.map = (ConcurrentHashMap) map.get("map");
        }

        static /* synthetic */ int access$108(WindowState windowState) {
            int i = windowState.count;
            windowState.count = i + 1;
            return i;
        }
    }

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("Unique Length window should only have two parameters (<string|int|bool|long|double|float> attribute, <int> windowLength), but found " + expressionExecutorArr.length + " input attributes");
        }
        this.uniqueKeyExpressionExecutor = expressionExecutorArr[0];
        this.length = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
        return () -> {
            return new WindowState(new ComplexEventChunk(false));
        };
    }

    protected void processEventChunk(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState windowState) {
        this.streamEventCloner = streamEventCloner;
        synchronized (windowState) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                next.setNext((ComplexEvent) null);
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent(copyStreamEvent);
                StreamEvent streamEvent = (StreamEvent) windowState.map.put(generateKey(copyStreamEvent2), copyStreamEvent2);
                if (streamEvent == null) {
                    WindowState.access$108(windowState);
                }
                if (windowState.count <= this.length && streamEvent == null) {
                    windowState.expiredEventChunk.add(copyStreamEvent);
                } else if (streamEvent != null) {
                    while (windowState.expiredEventChunk.hasNext()) {
                        if (windowState.expiredEventChunk.next().equals(streamEvent)) {
                            windowState.expiredEventChunk.remove();
                        }
                    }
                    windowState.expiredEventChunk.add(copyStreamEvent);
                    complexEventChunk.insertBeforeCurrent(streamEvent);
                    streamEvent.setTimestamp(currentTime);
                } else {
                    StreamEvent poll = windowState.expiredEventChunk.poll();
                    if (poll != null) {
                        poll.setTimestamp(currentTime);
                        complexEventChunk.insertBeforeCurrent(poll);
                        windowState.expiredEventChunk.add(copyStreamEvent);
                    } else {
                        complexEventChunk.insertBeforeCurrent(copyStreamEvent);
                    }
                }
            }
        }
        processor.process(complexEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator) compiledCondition).find(stateEvent, windowState.expiredEventChunk, this.streamEventCloner);
            }
            return streamEvent;
        } finally {
            this.stateHolder.returnState(windowState);
        }
    }

    private String generateKey(StreamEvent streamEvent) {
        return this.uniqueKeyExpressionExecutor.execute(streamEvent).toString();
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> list, Map<String, Table> map, SiddhiQueryContext siddhiQueryContext) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        try {
            Operator constructOperator = OperatorParser.constructOperator(windowState.expiredEventChunk, expression, matchingMetaInfoHolder, list, map, siddhiQueryContext);
            this.stateHolder.returnState(windowState);
            return constructOperator;
        } catch (Throwable th) {
            this.stateHolder.returnState(windowState);
            throw th;
        }
    }

    protected /* bridge */ /* synthetic */ void processEventChunk(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        processEventChunk((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (WindowState) state);
    }
}
