package org.wso2.extension.siddhi.execution.unique;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = "length", namespace = "unique", description = "This is a sliding length window that holds the latest window length unique events according to the unique key parameter and gets updated for each event arrival and expiry. When a new event arrives with the key that is already there in the window, then the previous event is expired and new event is kept within the window.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE}), @Parameter(name = "window.length", description = "The number of events that should be included in a sliding length window.", type = {DataType.INT})}, examples = {@Example(syntax = "define stream CseEventStream (symbol string, price float, volume int)\nfrom CseEventStream#window.unique:length(symbol,10)\nselect symbol, price, volume\ninsert all events into OutputStream ;", description = "In this configuration, the window holds the latest 10 unique events. The latest events are selected based on the symbol attribute. When the CseEventStream receives an event of which the value for the symbol attribute is the same as that of an existing event in the window, the existing event is replaced by the new event. All the events are returned to the OutputStream event stream once an event is expired or added to the window.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/unique/UniqueLengthWindowProcessor.class */
public class UniqueLengthWindowProcessor extends WindowProcessor implements FindableProcessor {
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private int length;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;
    private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<>();
    private int count = 0;

    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.expiredEventChunk = new ComplexEventChunk<>(false);
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("Unique Length window should only have two parameters (<string|int|bool|long|double|float> attribute, <int> windowLength), but found " + expressionExecutorArr.length + " input attributes");
        }
        this.uniqueKeyExpressionExecutor = expressionExecutorArr[0];
        this.length = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                next.setNext((ComplexEvent) null);
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent(copyStreamEvent);
                StreamEvent put = this.map.put(generateKey(copyStreamEvent2), copyStreamEvent2);
                if (put == null) {
                    this.count++;
                }
                if (this.count <= this.length && put == null) {
                    this.expiredEventChunk.add(copyStreamEvent);
                } else if (put != null) {
                    while (this.expiredEventChunk.hasNext()) {
                        if (this.expiredEventChunk.next().equals(put)) {
                            this.expiredEventChunk.remove();
                        }
                    }
                    this.expiredEventChunk.add(copyStreamEvent);
                    complexEventChunk.insertBeforeCurrent(put);
                    put.setTimestamp(currentTime);
                } else {
                    StreamEvent poll = this.expiredEventChunk.poll();
                    if (poll != null) {
                        poll.setTimestamp(currentTime);
                        complexEventChunk.insertBeforeCurrent(poll);
                        this.expiredEventChunk.add(copyStreamEvent);
                    } else {
                        complexEventChunk.insertBeforeCurrent(copyStreamEvent);
                    }
                }
            }
        }
        processor.process(complexEventChunk);
    }

    public void start() {
    }

    public void stop() {
    }

    public synchronized Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("expiredEventChunk", this.expiredEventChunk.getFirst());
        hashMap.put("count", Integer.valueOf(this.count));
        hashMap.put("map", this.map);
        return hashMap;
    }

    public synchronized void restoreState(Map<String, Object> map) {
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) map.get("expiredEventChunk"));
        this.count = ((Integer) map.get("count")).intValue();
        this.map = (ConcurrentHashMap) map.get("map");
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (compiledCondition instanceof Operator) {
            return ((Operator) compiledCondition).find(stateEvent, this.expiredEventChunk, this.streamEventCloner);
        }
        return null;
    }

    private String generateKey(StreamEvent streamEvent) {
        return this.uniqueKeyExpressionExecutor.execute(streamEvent).toString();
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.expiredEventChunk, expression, matchingMetaInfoHolder, siddhiAppContext, list, map, str);
    }
}
