package org.wso2.extension.siddhi.execution.unique;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = "lengthBatch", namespace = "unique", description = "This is a batch (tumbling) window that holds a specified number of latest unique events. The unique events are determined based on the value for a specified unique key parameter. The window is updated for every window length (i.e., for the last set of events of the specified number in a tumbling manner). When a new event that arrives within the a window length has the same value for the unique key parameter as an existing event is the window, the previous event is replaced by the new event.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE}), @Parameter(name = "window.length", description = "The number of events the window should tumble.", type = {DataType.INT})}, examples = {@Example(syntax = "define window CseEventWindow (symbol string, price float, volume int) from CseEventStream#window.unique:lengthBatch(symbol, 10)\nselect symbol, price, volume\ninsert expired events into OutputStream ;", description = "In this query, the window at any give time holds the last 10 unique events from the CseEventStream stream. Each of the 10 events within the window at a given time has a unique value for the symbol attribute. If a new event has the same value for the symbol attribute as an existing event within the window length, the existing event expires and it is replaced by the new event. The query returns expired individual events as well as expired batches of events to the OutputStream stream.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/unique/UniqueLengthBatchWindowProcessor.class */
public class UniqueLengthBatchWindowProcessor extends WindowProcessor implements FindableProcessor {
    private int windowLength;
    private SiddhiAppContext siddhiAppContext;
    private VariableExpressionExecutor uniqueKey;
    private int count = 0;
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>(false);
    private ComplexEventChunk<StreamEvent> eventsToBeExpired = null;
    private StreamEvent resetEvent = null;
    private Map<Object, StreamEvent> uniqueEventMap = new HashMap();

    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.eventsToBeExpired = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("Unique Length batch window should only have two parameters, but found " + expressionExecutorArr.length + " input attributes");
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("Unique Length Batch window should have variable for Unique Key parameter but found an attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        this.uniqueKey = (VariableExpressionExecutor) expressionExecutorArr[0];
        if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Unique Length Batch window should have constant for Length parameter but found a dynamic attribute " + expressionExecutorArr[1].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[1].getReturnType() != Attribute.Type.INT) {
            throw new SiddhiAppValidationException("Unique Length Batch window's Length parameter should be INT, but found " + expressionExecutorArr[1].getReturnType());
        }
        this.windowLength = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(true);
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    addUniqueEvent(this.uniqueEventMap, this.uniqueKey, streamEventCloner.copyStreamEvent(next));
                    if (this.uniqueEventMap.size() == this.windowLength) {
                        for (StreamEvent streamEvent : this.uniqueEventMap.values()) {
                            streamEvent.setTimestamp(currentTime);
                            this.currentEventChunk.add(streamEvent);
                        }
                        this.uniqueEventMap.clear();
                        if (this.eventsToBeExpired.getFirst() != null) {
                            while (this.eventsToBeExpired.hasNext()) {
                                this.eventsToBeExpired.next().setTimestamp(currentTime);
                            }
                            complexEventChunk2.add(this.eventsToBeExpired.getFirst());
                        }
                        this.eventsToBeExpired.clear();
                        if (this.currentEventChunk.getFirst() != null) {
                            complexEventChunk2.add(this.resetEvent);
                            this.currentEventChunk.reset();
                            while (this.currentEventChunk.hasNext()) {
                                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
                                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                                this.eventsToBeExpired.add(copyStreamEvent);
                            }
                            this.resetEvent = streamEventCloner.copyStreamEvent(this.currentEventChunk.getFirst());
                            this.resetEvent.setType(ComplexEvent.Type.RESET);
                            complexEventChunk2.add(this.currentEventChunk.getFirst());
                        }
                        this.currentEventChunk.clear();
                        if (complexEventChunk2.getFirst() != null) {
                            arrayList.add(complexEventChunk2);
                        }
                    }
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            processor.process((ComplexEventChunk) it.next());
        }
    }

    protected void addUniqueEvent(Map<Object, StreamEvent> map, VariableExpressionExecutor variableExpressionExecutor, StreamEvent streamEvent) {
        map.put(streamEvent.getAttribute(variableExpressionExecutor.getPosition()), streamEvent);
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        if (this.eventsToBeExpired == null) {
            HashMap hashMap = new HashMap();
            hashMap.put("currentEventChunk", this.currentEventChunk.getFirst());
            hashMap.put("count", Integer.valueOf(this.count));
            hashMap.put("resetEvent", this.resetEvent);
            return hashMap;
        }
        HashMap hashMap2 = new HashMap();
        hashMap2.put("currentEventChunk", this.currentEventChunk.getFirst());
        hashMap2.put("eventsToBeExpired", this.eventsToBeExpired.getFirst());
        hashMap2.put("count", Integer.valueOf(this.count));
        hashMap2.put("resetEvent", this.resetEvent);
        return hashMap2;
    }

    public void restoreState(Map<String, Object> map) {
        if (map.size() <= 3) {
            this.currentEventChunk.clear();
            this.currentEventChunk.add((StreamEvent) map.get("currentEventChunk"));
            this.count = ((Integer) map.get("count")).intValue();
            this.resetEvent = (StreamEvent) map.get("resetEvent");
            return;
        }
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent) map.get("currentEventChunk"));
        this.eventsToBeExpired.clear();
        this.eventsToBeExpired.add((StreamEvent) map.get("eventsToBeExpired"));
        this.count = ((Integer) map.get("count")).intValue();
        this.resetEvent = (StreamEvent) map.get("resetEvent");
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (compiledCondition instanceof Operator) {
            return ((Operator) compiledCondition).find(stateEvent, this.uniqueEventMap.values(), this.streamEventCloner);
        }
        return null;
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.uniqueEventMap.values(), expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }
}
