package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.expression.Expression;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Extension(name = "ever", namespace = "unique", description = "Window that retains the latest events based on a given unique keys. When a new event arrives with the same key it replaces the one that exist in the window.<b>This function is not recommended to be used when the maximum number of unique attributes are undefined, as there is a risk of system going out to memory</b>.", parameters = {@Parameter(name = "unique.key", description = "The attribute used to checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic = true)}, parameterOverloads = {@ParameterOverload(parameterNames = {"unique.key"}), @ParameterOverload(parameterNames = {"unique.key", "..."})}, examples = {@Example(syntax = "define stream LoginEvents (timestamp long, ip string);\n\nfrom LoginEvents#window.unique:ever(ip)\nselect count(ip) as ipCount\ninsert events into UniqueIps;", description = "Query collects all unique events based on the `ip` attribute by retaining the latest unique events from the `LoginEvents` stream. Then the query counts the unique `ip`s arrived so far and outputs the `ipCount` via the `UniqueIps` stream."), @Example(syntax = "define stream DriverChangeStream (trainID string, driver string);\n\nfrom DriverChangeStream#window.unique:ever(trainID)\nselect trainID, driver\ninsert expired events into PreviousDriverChangeStream;", description = "Query collects all unique events based on the `trainID` attribute by retaining the latest unique events from the `DriverChangeStream` stream. The query outputs the previous unique event stored in the window as the expired events are emitted via `PreviousDriverChangeStream` stream."), @Example(syntax = "define stream StockStream (symbol string, price float);\ndefine stream PriceRequestStream(symbol string);\n\nfrom StockStream#window.unique:ever(symbol) as s join PriceRequestStream as p\non s.symbol == p.symbol \nselect s.symbol as symbol, s.price as price \ninsert events into PriceResponseStream;", description = "Query stores the last unique event for each `symbol` attribute of `StockStream` stream, and joins them with events arriving on the `PriceRequestStream` for equal `symbol` attributes to fetch the latest `price` for each requested `symbol` and output via `PriceResponseStream` stream.")})
/* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueEverWindowProcessor.class */
public class UniqueEverWindowProcessor extends WindowProcessor<WindowState> implements FindableProcessor {
    private ExpressionExecutor[] expressionExecutors;
    private SiddhiAppContext siddhiAppContext;
    private StreamEventCloner streamEventCloner;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueEverWindowProcessor$WindowState.class */
    public class WindowState extends State {
        private ConcurrentMap<String, StreamEvent> map = new ConcurrentHashMap();

        WindowState() {
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            return Collections.singletonMap("map", this.map);
        }

        public void restore(Map<String, Object> map) {
            this.map = (ConcurrentMap) map.get("map");
        }
    }

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.expressionExecutors = expressionExecutorArr;
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        return () -> {
            return new WindowState();
        };
    }

    protected void processEventChunk(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState windowState) {
        this.streamEventCloner = streamEventCloner;
        synchronized (windowState) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            ComplexEvent complexEvent = (StreamEvent) complexEventChunk.getFirst();
            complexEventChunk.clear();
            while (complexEvent != null) {
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(complexEvent);
                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                StreamEvent streamEvent = (StreamEvent) windowState.map.put(generateKey(copyStreamEvent), copyStreamEvent);
                if (streamEvent != null) {
                    streamEvent.setTimestamp(currentTime);
                    complexEventChunk.add(streamEvent);
                }
                ComplexEvent next = complexEvent.getNext();
                complexEvent.setNext((ComplexEvent) null);
                complexEventChunk.add(complexEvent);
                complexEvent = next;
            }
        }
        processor.process(complexEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator) compiledCondition).find(stateEvent, windowState.map.values(), this.streamEventCloner);
            }
            return streamEvent;
        } finally {
            this.stateHolder.returnState(windowState);
        }
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> list, Map<String, Table> map, SiddhiQueryContext siddhiQueryContext) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        try {
            Operator constructOperator = OperatorParser.constructOperator(windowState.map.values(), expression, matchingMetaInfoHolder, list, map, siddhiQueryContext);
            this.stateHolder.returnState(windowState);
            return constructOperator;
        } catch (Throwable th) {
            this.stateHolder.returnState(windowState);
            throw th;
        }
    }

    private String generateKey(StreamEvent streamEvent) {
        StringBuilder sb = new StringBuilder();
        for (ExpressionExecutor expressionExecutor : this.expressionExecutors) {
            sb.append(expressionExecutor.execute(streamEvent));
        }
        return sb.toString();
    }

    protected /* bridge */ /* synthetic */ void processEventChunk(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        processEventChunk((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (WindowState) state);
    }
}
