package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@Extension(name = "lengthBatch", namespace = "unique", description = "This is a batch (tumbling) window that holds a specified number of latest unique events. The unique events are determined based on the value for a specified unique key parameter. The window is updated for every window length, i.e., for the last set of events of the specified number in a tumbling manner. When a new event arrives within the window length having the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic = true), @Parameter(name = "window.length", description = "The number of events the window should tumble.", type = {DataType.INT})}, parameterOverloads = {@ParameterOverload(parameterNames = {"unique.key", "window.length"})}, examples = {@Example(syntax = "define window CseEventWindow (symbol string, price float, volume int)\n\n from CseEventStream#window.unique:lengthBatch(symbol, 10)\nselect symbol, price, volume\ninsert expired events into OutputStream ;", description = "In this query, the window at any give time holds the last 10 unique events from the 'CseEventStream' stream. Each of the 10 events within the window at a given time has a unique value for the symbol attribute. If a new event has the same value for the symbol attribute as an existing event within the window length, the existing event expires and it is replaced by the new event. The query returns expired individual events as well as expired batches of events to the 'OutputStream' stream.")})
/* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueLengthBatchWindowProcessor.class */
public class UniqueLengthBatchWindowProcessor extends WindowProcessor<WindowState> implements FindableProcessor {
    private int windowLength;
    private SiddhiAppContext siddhiAppContext;
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private Map<Object, StreamEvent> uniqueEventMap = new HashMap();
    private StreamEventCloner streamEventCloner;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueLengthBatchWindowProcessor$WindowState.class */
    public class WindowState extends State {
        private ComplexEventChunk<StreamEvent> eventsToBeExpired;
        private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>(false);
        private int count = 0;
        private StreamEvent resetEvent = null;

        WindowState(ComplexEventChunk<StreamEvent> complexEventChunk) {
            this.eventsToBeExpired = null;
            this.eventsToBeExpired = complexEventChunk;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            if (this.eventsToBeExpired == null) {
                HashMap hashMap = new HashMap();
                hashMap.put("currentEventChunk", this.currentEventChunk.getFirst());
                hashMap.put("count", Integer.valueOf(this.count));
                hashMap.put("resetEvent", this.resetEvent);
                return hashMap;
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.put("currentEventChunk", this.currentEventChunk.getFirst());
            hashMap2.put("eventsToBeExpired", this.eventsToBeExpired.getFirst());
            hashMap2.put("count", Integer.valueOf(this.count));
            hashMap2.put("resetEvent", this.resetEvent);
            return hashMap2;
        }

        public void restore(Map<String, Object> map) {
            if (map.size() <= 3) {
                this.currentEventChunk.clear();
                this.currentEventChunk.add((StreamEvent) map.get("currentEventChunk"));
                this.count = ((Integer) map.get("count")).intValue();
                this.resetEvent = (StreamEvent) map.get("resetEvent");
                return;
            }
            this.currentEventChunk.clear();
            this.currentEventChunk.add((StreamEvent) map.get("currentEventChunk"));
            this.eventsToBeExpired.clear();
            this.eventsToBeExpired.add((StreamEvent) map.get("eventsToBeExpired"));
            this.count = ((Integer) map.get("count")).intValue();
            this.resetEvent = (StreamEvent) map.get("resetEvent");
        }
    }

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("Unique Length batch window should only have two parameters, but found " + expressionExecutorArr.length + " input attributes");
        }
        this.uniqueKeyExpressionExecutor = expressionExecutorArr[0];
        if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Unique Length Batch window should have constant for Length parameter but found a dynamic attribute " + expressionExecutorArr[1].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[1].getReturnType() != Attribute.Type.INT) {
            throw new SiddhiAppValidationException("Unique Length Batch window's Length parameter should be INT, but found " + expressionExecutorArr[1].getReturnType());
        }
        this.windowLength = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
        return () -> {
            return new WindowState(new ComplexEventChunk(false));
        };
    }

    protected void processEventChunk(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState windowState) {
        this.streamEventCloner = streamEventCloner;
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(true);
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    addUniqueEvent(this.uniqueEventMap, this.uniqueKeyExpressionExecutor, streamEventCloner.copyStreamEvent(next));
                    if (this.uniqueEventMap.size() == this.windowLength) {
                        for (StreamEvent streamEvent : this.uniqueEventMap.values()) {
                            streamEvent.setTimestamp(currentTime);
                            windowState.currentEventChunk.add(streamEvent);
                        }
                        this.uniqueEventMap.clear();
                        if (windowState.eventsToBeExpired.getFirst() != null) {
                            while (windowState.eventsToBeExpired.hasNext()) {
                                windowState.eventsToBeExpired.next().setTimestamp(currentTime);
                            }
                            complexEventChunk2.add(windowState.eventsToBeExpired.getFirst());
                        }
                        windowState.eventsToBeExpired.clear();
                        if (windowState.currentEventChunk.getFirst() != null) {
                            complexEventChunk2.add(windowState.resetEvent);
                            windowState.currentEventChunk.reset();
                            while (windowState.currentEventChunk.hasNext()) {
                                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(windowState.currentEventChunk.next());
                                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                                windowState.eventsToBeExpired.add(copyStreamEvent);
                            }
                            windowState.resetEvent = streamEventCloner.copyStreamEvent(windowState.currentEventChunk.getFirst());
                            windowState.resetEvent.setType(ComplexEvent.Type.RESET);
                            complexEventChunk2.add(windowState.currentEventChunk.getFirst());
                        }
                        windowState.currentEventChunk.clear();
                        if (complexEventChunk2.getFirst() != null) {
                            arrayList.add(complexEventChunk2);
                        }
                    }
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process((ComplexEventChunk) it.next());
        }
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    protected void addUniqueEvent(Map<Object, StreamEvent> map, ExpressionExecutor expressionExecutor, StreamEvent streamEvent) {
        map.put(expressionExecutor.execute(streamEvent), streamEvent);
    }

    public void start() {
    }

    public void stop() {
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator) compiledCondition).find(stateEvent, this.uniqueEventMap.values(), this.streamEventCloner);
            }
            return streamEvent;
        } finally {
            this.stateHolder.returnState(windowState);
        }
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> list, Map<String, Table> map, SiddhiQueryContext siddhiQueryContext) {
        return OperatorParser.constructOperator(this.uniqueEventMap.values(), expression, matchingMetaInfoHolder, list, map, siddhiQueryContext);
    }

    protected /* bridge */ /* synthetic */ void processEventChunk(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        processEventChunk((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (WindowState) state);
    }
}
