/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.query.processor.stream.window.WindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Extension(name="time", namespace="unique", description="This is a sliding time window that holds the latest unique events that arrived during the previous time window. The unique events are determined based on the value for a specified unique key parameter. The window is updated with the arrival and expiry of each event. When a new event that arrives within a window time period has the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.", parameters={@Parameter(name="unique.key", description="The attribute that should be checked for uniqueness. ", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}), @Parameter(name="window.time", description="The sliding time period for which the window should hold events.", type={DataType.INT, DataType.LONG})}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "window.time"})}, examples={@Example(syntax="define stream CseEventStream (symbol string, price float, volume int)\n\nfrom CseEventStream#window.unique:time(symbol, 1 sec)\nselect symbol, price, volume\ninsert expired events into OutputStream ;", description="In this query, the window holds the latest unique events that arrived within the last second from the 'CseEventStream', and returns the expired events to the 'OutputStream' stream. During any given second, each event in the window should have a unique value for the 'symbol' attribute. If a new event that arrives within the same second has the same value for the symbol attribute as an existing event in the window, the existing event expires.")})
public class UniqueTimeWindowProcessor
extends WindowProcessor<WindowState>
implements SchedulingProcessor,
FindableProcessor {
    private long timeInMilliSeconds;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private volatile long lastTimestamp = Long.MIN_VALUE;
    private ExpressionExecutor uniqueKeyExpressionExecutor;
    private StreamEventCloner streamEventCloner;

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected StateFactory<WindowState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiAppContext = siddhiQueryContext.getSiddhiAppContext();
        if (attributeExpressionExecutors.length != 2) throw new SiddhiAppValidationException("UniqueTime window should only have two parameters (<string|int|bool|long|double|float> unique attribute, <int|long|time> windowTime), but found " + attributeExpressionExecutors.length + " input attributes");
        this.uniqueKeyExpressionExecutor = attributeExpressionExecutors[0];
        if (!(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("UniqueTime window should have constant for time parameter but found a dynamic attribute " + attributeExpressionExecutors[0].getClass().getCanonicalName());
        if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
            this.timeInMilliSeconds = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue()).intValue();
            return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
        } else {
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("UniqueTime window's parameter time should be either int or long, but found " + attributeExpressionExecutors[0].getReturnType());
            this.timeInMilliSeconds = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        }
        return () -> new WindowState((ComplexEventChunk<StreamEvent>)new ComplexEventChunk(false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processEventChunk(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, WindowState state) {
        this.streamEventCloner = streamEventCloner;
        WindowState windowState = state;
        synchronized (windowState) {
            while (streamEventChunk.hasNext()) {
                StreamEvent expiredEvent;
                long timeDiff;
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                StreamEvent oldEvent = null;
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
                    clonedEvent.setType(ComplexEvent.Type.EXPIRED);
                    StreamEvent eventClonedForMap = streamEventCloner.copyStreamEvent(streamEvent);
                    eventClonedForMap.setType(ComplexEvent.Type.EXPIRED);
                    oldEvent = state.map.put(this.generateKey(eventClonedForMap), eventClonedForMap);
                    state.expiredEventChunk.add((ComplexEvent)clonedEvent);
                    if (this.lastTimestamp < clonedEvent.getTimestamp() && this.scheduler != null) {
                        this.scheduler.notifyAt(clonedEvent.getTimestamp() + this.timeInMilliSeconds);
                        this.lastTimestamp = clonedEvent.getTimestamp();
                    }
                }
                state.expiredEventChunk.reset();
                while (state.expiredEventChunk.hasNext() && ((timeDiff = (expiredEvent = (StreamEvent)state.expiredEventChunk.next()).getTimestamp() - currentTime + this.timeInMilliSeconds) <= 0L || oldEvent != null)) {
                    if (oldEvent != null) {
                        if (!expiredEvent.equals((Object)oldEvent)) continue;
                        state.expiredEventChunk.remove();
                        streamEventChunk.insertBeforeCurrent((ComplexEvent)oldEvent);
                        oldEvent.setTimestamp(currentTime);
                        oldEvent = null;
                        continue;
                    }
                    state.expiredEventChunk.remove();
                    expiredEvent.setTimestamp(currentTime);
                    streamEventChunk.insertBeforeCurrent((ComplexEvent)expiredEvent);
                    expiredEvent.setTimestamp(currentTime);
                    state.expiredEventChunk.reset();
                }
                state.expiredEventChunk.reset();
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) continue;
                streamEventChunk.remove();
            }
        }
        nextProcessor.process(streamEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        WindowState state = (WindowState)this.stateHolder.getState();
        StreamEvent streamEvent = null;
        try {
            if (compiledCondition instanceof Operator) {
                streamEvent = ((Operator)compiledCondition).find(matchingEvent, (Object)state.expiredEventChunk, this.streamEventCloner);
            }
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return streamEvent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        Operator compiledCondition;
        WindowState state = (WindowState)this.stateHolder.getState();
        try {
            compiledCondition = OperatorParser.constructOperator((Object)state.expiredEventChunk, (Expression)expression, (MatchingMetaInfoHolder)matchingMetaInfoHolder, variableExpressionExecutors, tableMap, (SiddhiQueryContext)siddhiQueryContext);
        }
        finally {
            this.stateHolder.returnState((State)state);
        }
        return compiledCondition;
    }

    public void start() {
    }

    public void stop() {
    }

    private String generateKey(StreamEvent event) {
        return this.uniqueKeyExpressionExecutor.execute((ComplexEvent)event).toString();
    }

    class WindowState
    extends State {
        private ConcurrentMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>();
        private ComplexEventChunk<StreamEvent> expiredEventChunk;

        public WindowState(ComplexEventChunk<StreamEvent> expiredEventChunk) {
            this.expiredEventChunk = expiredEventChunk;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("expiredEventchunck", this.expiredEventChunk.getFirst());
            map.put("map", this.map);
            return map;
        }

        public void restore(Map<String, Object> state) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((ComplexEvent)((StreamEvent)state.get("expiredEventchunck")));
            this.map = (ConcurrentMap)state.get("map");
        }
    }
}

