package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Extension(name = "time", namespace = "unique", description = "This is a sliding time window that holds the latest unique events that arrived during the previous time window. The unique events are determined based on the value for a specified unique key parameter. The window is updated with the arrival and expiry of each event. When a new event that arrives within a window time period has the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.", parameters = {@Parameter(name = "unique.key", description = "The attribute that should be checked for uniqueness. ", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic = true), @Parameter(name = "window.time", description = "The sliding time period for which the window should hold events.", type = {DataType.INT, DataType.LONG})}, parameterOverloads = {@ParameterOverload(parameterNames = {"unique.key", "window.time"})}, examples = {@Example(syntax = "define stream CseEventStream (symbol string, price float, volume int)\n\nfrom CseEventStream#window.unique:time(symbol, 1 sec)\nselect symbol, price, volume\ninsert expired events into OutputStream ;", description = "In this query, the window holds the latest unique events that arrived within the last second from the 'CseEventStream', and returns the expired events to the 'OutputStream' stream. During any given second, each event in the window should have a unique value for the 'symbol' attribute. If a new event that arrives within the same second has the same value for the symbol attribute as an existing event in the window, the existing event expires.")})
/* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueTimeWindowProcessor.class */
public class UniqueTimeWindowProcessor extends WindowProcessor<WindowState> implements SchedulingProcessor, FindableProcessor {
    private long timeInMilliSeconds;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private volatile long lastTimestamp = Long.MIN_VALUE;
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private StreamEventCloner streamEventCloner;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/unique/UniqueTimeWindowProcessor$WindowState.class */
    public class WindowState extends State {
        private ConcurrentMap<String, StreamEvent> map = new ConcurrentHashMap();
        private ComplexEventChunk<StreamEvent> expiredEventChunk;

        public WindowState(ComplexEventChunk<StreamEvent> complexEventChunk) {
            this.expiredEventChunk = complexEventChunk;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("expiredEventchunck", this.expiredEventChunk.getFirst());
            hashMap.put("map", this.map);
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((StreamEvent) map.get("expiredEventchunck"));
            this.map = (ConcurrentMap) map.get("map");
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (expressionExecutorArr.length != 2) {
            throw new SiddhiAppValidationException("UniqueTime window should only have two parameters (<string|int|bool|long|double|float> unique attribute, <int|long|time> windowTime), but found " + expressionExecutorArr.length + " input attributes");
        }
        this.uniqueKeyExpressionExecutor = expressionExecutorArr[0];
        if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("UniqueTime window should have constant for time parameter but found a dynamic attribute " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[1].getReturnType() == Attribute.Type.INT) {
            this.timeInMilliSeconds = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).intValue();
        } else {
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppValidationException("UniqueTime window's parameter time should be either int or long, but found " + expressionExecutorArr[0].getReturnType());
            }
            this.timeInMilliSeconds = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).longValue();
        }
        return () -> {
            return new WindowState(new ComplexEventChunk(false));
        };
    }

    protected void processEventChunk(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState windowState) {
        this.streamEventCloner = streamEventCloner;
        synchronized (windowState) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                StreamEvent streamEvent = null;
                if (next.getType() == ComplexEvent.Type.CURRENT) {
                    StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    StreamEvent copyStreamEvent2 = streamEventCloner.copyStreamEvent(next);
                    copyStreamEvent2.setType(ComplexEvent.Type.EXPIRED);
                    streamEvent = (StreamEvent) windowState.map.put(generateKey(copyStreamEvent2), copyStreamEvent2);
                    windowState.expiredEventChunk.add(copyStreamEvent);
                    if (this.lastTimestamp < copyStreamEvent.getTimestamp() && this.scheduler != null) {
                        this.scheduler.notifyAt(copyStreamEvent.getTimestamp() + this.timeInMilliSeconds);
                        this.lastTimestamp = copyStreamEvent.getTimestamp();
                    }
                }
                windowState.expiredEventChunk.reset();
                while (windowState.expiredEventChunk.hasNext()) {
                    StreamEvent next2 = windowState.expiredEventChunk.next();
                    if ((next2.getTimestamp() - currentTime) + this.timeInMilliSeconds > 0 && streamEvent == null) {
                        break;
                    }
                    if (streamEvent == null) {
                        windowState.expiredEventChunk.remove();
                        next2.setTimestamp(currentTime);
                        complexEventChunk.insertBeforeCurrent(next2);
                        next2.setTimestamp(currentTime);
                        windowState.expiredEventChunk.reset();
                    } else if (next2.equals(streamEvent)) {
                        windowState.expiredEventChunk.remove();
                        complexEventChunk.insertBeforeCurrent(streamEvent);
                        streamEvent.setTimestamp(currentTime);
                        streamEvent = null;
                    }
                }
                windowState.expiredEventChunk.reset();
                if (next.getType() != ComplexEvent.Type.CURRENT) {
                    complexEventChunk.remove();
                }
            }
        }
        processor.process(complexEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator) compiledCondition).find(stateEvent, windowState.expiredEventChunk, this.streamEventCloner);
            }
            return streamEvent;
        } finally {
            this.stateHolder.returnState(windowState);
        }
    }

    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> list, Map<String, Table> map, SiddhiQueryContext siddhiQueryContext) {
        WindowState windowState = (WindowState) this.stateHolder.getState();
        try {
            Operator constructOperator = OperatorParser.constructOperator(windowState.expiredEventChunk, expression, matchingMetaInfoHolder, list, map, siddhiQueryContext);
            this.stateHolder.returnState(windowState);
            return constructOperator;
        } catch (Throwable th) {
            this.stateHolder.returnState(windowState);
            throw th;
        }
    }

    public void start() {
    }

    public void stop() {
    }

    private String generateKey(StreamEvent streamEvent) {
        return this.uniqueKeyExpressionExecutor.execute(streamEvent).toString();
    }

    protected /* bridge */ /* synthetic */ void processEventChunk(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        processEventChunk((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (WindowState) state);
    }
}
