/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.processor.stream.window;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.TimeWindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.SnapshotStateList;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="delay", namespace="", description="A delay window holds events for a specific time period that is regarded as a delay period before processing them.", parameters={@Parameter(name="window.delay", description="The time period (specified in sec, min, ms) for which  the window should delay the events.", type={DataType.INT, DataType.LONG, DataType.TIME})}, parameterOverloads={@ParameterOverload(parameterNames={"window.delay"})}, examples={@Example(syntax="define window delayWindow(symbol string, volume int) delay(1 hour);\ndefine stream PurchaseStream(symbol string, volume int);\ndefine stream DeliveryStream(symbol string);\ndefine stream OutputStream(symbol string);\n\n@info(name='query1') \nfrom PurchaseStream\nselect symbol, volume\ninsert into delayWindow;\n\n@info(name='query2') \nfrom delayWindow join DeliveryStream\non delayWindow.symbol == DeliveryStream.symbol\nselect delayWindow.symbol\ninsert into OutputStream;", description="In this example, purchase events that arrive in the 'PurchaseStream' stream are directed to a delay window. At any given time, this delay window holds purchase events that have arrived within the last hour. These purchase events in the window are matched by the 'symbol' attribute, with delivery events that arrive in the 'DeliveryStream' stream. This monitors whether the delivery of products is done with a minimum delay of one hour after the purchase.")})
public class DelayWindowProcessor
extends TimeWindowProcessor {
    private long delayInMilliSeconds;
    private SiddhiQueryContext siddhiQueryContext;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiQueryContext = siddhiQueryContext;
        if (attributeExpressionExecutors.length != 1) throw new SiddhiAppValidationException("Delay window should only have one parameter (<int|long|time> delayTime), but found " + attributeExpressionExecutors.length + " input attributes");
        if (!(attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Delay window should have constant parameter attribute but found a dynamic attribute " + attributeExpressionExecutors[0].getClass().getCanonicalName());
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT && attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Delay window's parameter attribute should be either int or long, but found " + attributeExpressionExecutors[0].getReturnType());
        }
        this.delayInMilliSeconds = Long.parseLong(((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue().toString());
        return () -> new DelayedWindowState(this.streamEventClonerHolder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, TimeWindowProcessor.WindowState windowState) {
        DelayedWindowState state;
        DelayedWindowState delayedWindowState = state = (DelayedWindowState)windowState;
        synchronized (delayedWindowState) {
            while (streamEventChunk.hasNext()) {
                StreamEvent delayedEvent;
                long timeDiff;
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                long currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                state.delayedEventQueue.reset();
                while (state.delayedEventQueue.hasNext() && (timeDiff = (delayedEvent = state.delayedEventQueue.next()).getTimestamp() - currentTime + this.delayInMilliSeconds) <= 0L) {
                    state.delayedEventQueue.remove();
                    delayedEvent.setTimestamp(currentTime);
                    streamEventChunk.insertBeforeCurrent(delayedEvent);
                }
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    state.delayedEventQueue.add(streamEvent);
                    if (state.lastTimestamp < streamEvent.getTimestamp()) {
                        this.getScheduler().notifyAt(streamEvent.getTimestamp() + this.delayInMilliSeconds);
                        state.lastTimestamp = streamEvent.getTimestamp();
                    }
                }
                streamEventChunk.remove();
            }
            state.delayedEventQueue.reset();
        }
        if (streamEventChunk.getFirst() != null) {
            nextProcessor.process(streamEventChunk);
        }
    }

    @Override
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition, StreamEventCloner streamEventCloner, TimeWindowProcessor.WindowState state) {
        return ((Operator)compiledCondition).find(matchingEvent, ((DelayedWindowState)state).delayedEventQueue, streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, TimeWindowProcessor.WindowState state, SiddhiQueryContext siddhiQueryContext) {
        return OperatorParser.constructOperator(((DelayedWindowState)state).delayedEventQueue, condition, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);
    }

    class DelayedWindowState
    extends TimeWindowProcessor.WindowState {
        private SnapshotableStreamEventQueue delayedEventQueue;
        private volatile long lastTimestamp;

        DelayedWindowState(StreamEventClonerHolder streamEventClonerHolder) {
            super(DelayWindowProcessor.this, streamEventClonerHolder);
            this.lastTimestamp = Long.MIN_VALUE;
            this.delayedEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("DelayedEventQueue", this.delayedEventQueue.getSnapshot());
            state.put("LastTimestamp", this.lastTimestamp);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.delayedEventQueue.restore((SnapshotStateList)state.get("DelayedEventQueue"));
            this.lastTimestamp = (Long)state.get("LastTimestamp");
        }

        @Override
        public boolean canDestroy() {
            return this.delayedEventQueue.getFirst() == null;
        }
    }
}

