package org.wso2.extension.siddhi.execution.unique;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name = "ever", namespace = "unique", description = "This is a window that is updated with the latest events based on a unique key parameter. When a new event arrives with the same value for the unique key parameter as the existing event, the existing event expires, and is replaced with the latest one.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness.If multiple attributes need to be checked, we can specify them as a comma-separated list.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE})}, examples = {@Example(syntax = "define stream LoginEvents (timeStamp long, ip string) ;\nfrom LoginEvents#window.unique:ever(ip)\nselect count(ip) as ipCount, ip \ninsert all events into UniqueIps  ;", description = "The above query determines the latest events that have arrived from the 'LoginEvents' stream, based on the 'ip' attribute. At a given time, all the events held in the window should have a unique value for the ip attribute. All the processed events are directed to the 'UniqueIps' output stream with 'ip' and 'ipCount' attributes."), @Example(syntax = "define stream LoginEvents (timeStamp long, ip string , id string) ;\nfrom LoginEvents#window.unique:ever(ip, id)\nselect count(ip) as ipCount, ip , id \ninsert expired events into UniqueIps  ;", description = "This query determines the latest events to be included in the window based on the ip and id attributes. When the 'LoginEvents' event stream receives a new event of which the combination of values for the ip and id attributes matches that of an existing event in the window, the existing event expires and it is replaced with the new event. The expired events which have been expired as a result of being replaced by a newer event are directed to the 'uniqueIps' output stream.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/unique/UniqueEverWindowProcessor.class */
public class UniqueEverWindowProcessor extends WindowProcessor implements FindableProcessor {
    private ConcurrentMap<String, StreamEvent> map = new ConcurrentHashMap();
    private ExpressionExecutor[] expressionExecutors;

    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.expressionExecutors = expressionExecutorArr;
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            ComplexEvent complexEvent = (StreamEvent) complexEventChunk.getFirst();
            complexEventChunk.clear();
            while (complexEvent != null) {
                StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(complexEvent);
                copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                StreamEvent put = this.map.put(generateKey(copyStreamEvent), copyStreamEvent);
                if (put != null) {
                    put.setTimestamp(currentTime);
                    complexEventChunk.add(put);
                }
                ComplexEvent next = complexEvent.getNext();
                complexEvent.setNext((ComplexEvent) null);
                complexEventChunk.add(complexEvent);
                complexEvent = next;
            }
        }
        processor.process(complexEventChunk);
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        return Collections.singletonMap("map", this.map);
    }

    public synchronized void restoreState(Map<String, Object> map) {
        this.map = (ConcurrentMap) map.get("map");
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (compiledCondition instanceof Operator) {
            return ((Operator) compiledCondition).find(stateEvent, this.map.values(), this.streamEventCloner);
        }
        return null;
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list, Map<String, Table> map, String str) {
        return OperatorParser.constructOperator(this.map.values(), expression, matchingMetaInfoHolder, siddhiAppContext, list, map, this.queryName);
    }

    private String generateKey(StreamEvent streamEvent) {
        StringBuilder sb = new StringBuilder();
        for (ExpressionExecutor expressionExecutor : this.expressionExecutors) {
            sb.append(expressionExecutor.execute(streamEvent));
        }
        return sb.toString();
    }
}
