/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.processor.stream.window;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.window.BatchingFindableWindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="externalTimeBatch", namespace="", description="A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime.", parameters={@Parameter(name="timestamp", description="The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.", type={DataType.LONG}), @Parameter(name="window.time", description="The batch time period for which the window should hold events.", type={DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name="start.time", description="User defined start time. This could either be a constant (of type int, long or time) or an attribute of the corresponding stream (of type long). If an attribute is provided, initial value of attribute would be considered as startTime.", type={DataType.INT, DataType.LONG, DataType.TIME}, optional=true, defaultValue="Timestamp of first event"), @Parameter(name="timeout", description="Time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch.", type={DataType.INT, DataType.LONG, DataType.TIME}, optional=true, defaultValue="System waits till an event from next batch arrives to flush current batch")}, examples={@Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 1 sec) output expired events;\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n@info(name = 'query1')\nfrom cseEventWindow\nselect symbol, sum(price) as price\ninsert expired events into outputStream ;", description="This will processing events that arrive every 1 seconds from the eventTime."), @Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 20 sec, 0) output expired events;", description="This will processing events that arrive every 1 seconds from the eventTime. Starts on 0th millisecond of an hour."), @Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 2 sec, eventTimestamp, 100) output expired events;", description="This will processing events that arrive every 2 seconds from the eventTim. Considers the first event's eventTimestamp value as startTime. Waits 100 milliseconds for the arrival of a new event before flushing current batch.")})
public class ExternalTimeBatchWindowProcessor
extends BatchingFindableWindowProcessor<WindowState>
implements SchedulingProcessor {
    private VariableExpressionExecutor timestampExpressionExecutor;
    private ExpressionExecutor startTimeAsVariable;
    private long timeToKeep;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = 0L;
    private Scheduler scheduler;
    private boolean findToBeExecuted = false;
    private boolean replaceTimestampWithBatchEndTime = false;
    private boolean outputExpectsExpiredEvents;
    private long commonStartTime = 0L;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected StateFactory init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
        this.findToBeExecuted = findToBeExecuted;
        if (attributeExpressionExecutors.length < 2 || attributeExpressionExecutors.length > 5) throw new SiddhiAppValidationException("ExternalTimeBatch window should only have two to five parameters (<long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout, <bool> replaceTimestampWithBatchEndTime), but found " + attributeExpressionExecutors.length + " input attributes");
        if (!(attributeExpressionExecutors[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be a variable, but found " + attributeExpressionExecutors[0].getClass());
        }
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be type long, but found " + attributeExpressionExecutors[0].getReturnType());
        }
        this.timestampExpressionExecutor = (VariableExpressionExecutor)attributeExpressionExecutors[0];
        if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue()).intValue();
        } else {
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 2nd parameter windowTime should be either int or long, but found " + attributeExpressionExecutors[1].getReturnType());
            this.timeToKeep = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        }
        if (attributeExpressionExecutors.length >= 3) {
            this.isStartTimeEnabled = true;
            if (attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor) {
                if (attributeExpressionExecutors[2].getReturnType() == Attribute.Type.INT) {
                    this.commonStartTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
                } else {
                    if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + attributeExpressionExecutors[2].getReturnType());
                    this.commonStartTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
                }
            } else {
                if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + attributeExpressionExecutors[2].getReturnType());
                }
                this.startTimeAsVariable = attributeExpressionExecutors[2];
            }
        }
        if (attributeExpressionExecutors.length >= 4) {
            if (attributeExpressionExecutors[3].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            } else {
                if (attributeExpressionExecutors[3].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 4th parameter timeout should be either int or long, but found " + attributeExpressionExecutors[3].getReturnType());
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            }
        }
        if (attributeExpressionExecutors.length != 5) return () -> new WindowState(outputExpectsExpiredEvents, this.schedulerTimeout, this.commonStartTime);
        if (attributeExpressionExecutors[4].getReturnType() != Attribute.Type.BOOL) throw new SiddhiAppValidationException("ExternalTimeBatch window's 5th parameter replaceTimestampWithBatchEndTime should be bool, but found " + attributeExpressionExecutors[4].getReturnType());
        this.replaceTimestampWithBatchEndTime = Boolean.parseBoolean(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[4]).getValue()));
        return () -> new WindowState(outputExpectsExpiredEvents, this.schedulerTimeout, this.commonStartTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, WindowState state) {
        if (streamEventChunk.getFirst() == null) {
            return;
        }
        ArrayList<ComplexEventChunk<StreamEvent>> complexEventChunks = new ArrayList<ComplexEventChunk<StreamEvent>>();
        WindowState windowState = state;
        synchronized (windowState) {
            void var7_8;
            this.initTiming(streamEventChunk.getFirst(), state);
            StreamEvent streamEvent = streamEventChunk.getFirst();
            while (var7_8 != null) {
                void currStreamEvent = var7_8;
                StreamEvent streamEvent2 = var7_8.getNext();
                if (currStreamEvent.getType() == ComplexEvent.Type.TIMER) {
                    if (state.lastScheduledTime > currStreamEvent.getTimestamp()) continue;
                    if (!state.flushed) {
                        this.flushToOutputChunk(streamEventCloner, complexEventChunks, state.lastCurrentEventTime, true, state);
                        state.flushed = true;
                    } else if (state.currentEventChunk.getFirst() != null) {
                        this.appendToOutputChunk(streamEventCloner, complexEventChunks, state.lastCurrentEventTime, true, state);
                    }
                    state.lastScheduledTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime() + this.schedulerTimeout;
                    this.scheduler.notifyAt(state.lastScheduledTime);
                    continue;
                }
                if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                long currentEventTime = (Long)this.timestampExpressionExecutor.execute((ComplexEvent)currStreamEvent);
                if (state.lastCurrentEventTime < currentEventTime) {
                    state.lastCurrentEventTime = currentEventTime;
                }
                if (currentEventTime < state.endTime) {
                    this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent, state);
                    continue;
                }
                if (state.flushed) {
                    this.appendToOutputChunk(streamEventCloner, complexEventChunks, state.lastCurrentEventTime, false, state);
                    state.flushed = false;
                } else {
                    this.flushToOutputChunk(streamEventCloner, complexEventChunks, state.lastCurrentEventTime, false, state);
                }
                state.endTime = this.findEndTime(state.lastCurrentEventTime, state.startTime, this.timeToKeep);
                this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent, state);
                if (this.schedulerTimeout <= 0L) continue;
                state.lastScheduledTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(state.lastScheduledTime);
            }
        }
        for (ComplexEventChunk complexEventChunk : complexEventChunks) {
            nextProcessor.process(complexEventChunk);
        }
    }

    private void initTiming(StreamEvent firstStreamEvent, WindowState state) {
        if (state.endTime < 0L) {
            if (this.isStartTimeEnabled) {
                if (this.startTimeAsVariable == null) {
                    state.endTime = this.findEndTime((Long)this.timestampExpressionExecutor.execute(firstStreamEvent), state.startTime, this.timeToKeep);
                } else {
                    state.startTime = (Long)this.startTimeAsVariable.execute(firstStreamEvent);
                    state.endTime = state.startTime + this.timeToKeep;
                }
            } else {
                state.startTime = (Long)this.timestampExpressionExecutor.execute(firstStreamEvent);
                state.endTime = state.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0L) {
                state.lastScheduledTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(state.lastScheduledTime);
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, long currentTime, boolean preserveCurrentEvents, WindowState state) {
        ComplexEventChunk newEventChunk = new ComplexEventChunk(true);
        if (this.outputExpectsExpiredEvents && state.expiredEventChunk.getFirst() != null) {
            state.expiredEventChunk.reset();
            while (state.expiredEventChunk.hasNext()) {
                StreamEvent expiredEvent = (StreamEvent)state.expiredEventChunk.next();
                expiredEvent.setTimestamp(currentTime);
            }
            newEventChunk.add(state.expiredEventChunk.getFirst());
        }
        if (state.expiredEventChunk != null) {
            state.expiredEventChunk.clear();
        }
        if (state.currentEventChunk.getFirst() != null) {
            state.resetEvent.setTimestamp(currentTime);
            newEventChunk.add(state.resetEvent);
            state.resetEvent = null;
            if (preserveCurrentEvents || state.expiredEventChunk != null) {
                state.currentEventChunk.reset();
                while (state.currentEventChunk.hasNext()) {
                    StreamEvent currentEvent = (StreamEvent)state.currentEventChunk.next();
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    state.expiredEventChunk.add(toExpireEvent);
                }
            }
            newEventChunk.add(state.currentEventChunk.getFirst());
        }
        state.currentEventChunk.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add(newEventChunk);
        }
    }

    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, long currentTime, boolean preserveCurrentEvents, WindowState state) {
        ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>(true);
        ComplexEventChunk<StreamEvent> sentEventChunk = new ComplexEventChunk<StreamEvent>(true);
        if (state.currentEventChunk.getFirst() != null) {
            if (state.expiredEventChunk != null && state.expiredEventChunk.getFirst() != null) {
                state.expiredEventChunk.reset();
                while (state.expiredEventChunk.hasNext()) {
                    StreamEvent expiredEvent = (StreamEvent)state.expiredEventChunk.next();
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(expiredEvent);
                        toExpireEvent.setTimestamp(currentTime);
                        newEventChunk.add(toExpireEvent);
                    }
                    StreamEvent toSendEvent = streamEventCloner.copyStreamEvent(expiredEvent);
                    toSendEvent.setType(ComplexEvent.Type.CURRENT);
                    sentEventChunk.add(toSendEvent);
                }
            }
            StreamEvent toResetEvent = streamEventCloner.copyStreamEvent(state.resetEvent);
            toResetEvent.setTimestamp(currentTime);
            newEventChunk.add(toResetEvent);
            newEventChunk.add((StreamEvent)sentEventChunk.getFirst());
            if (preserveCurrentEvents || state.expiredEventChunk != null) {
                state.currentEventChunk.reset();
                while (state.currentEventChunk.hasNext()) {
                    StreamEvent currentEvent = (StreamEvent)state.currentEventChunk.next();
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    state.expiredEventChunk.add(toExpireEvent);
                }
            }
            newEventChunk.add((StreamEvent)state.currentEventChunk.getFirst());
        }
        state.currentEventChunk.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add(newEventChunk);
        }
    }

    private long findEndTime(long currentTime, long startTime, long timeToKeep) {
        long elapsedTimeSinceLastEmit = (currentTime - startTime) % timeToKeep;
        return currentTime + (timeToKeep - elapsedTimeSinceLastEmit);
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent, WindowState state) {
        StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
        if (this.replaceTimestampWithBatchEndTime) {
            clonedStreamEvent.setAttribute(state.endTime, this.timestampExpressionExecutor.getPosition());
        }
        state.currentEventChunk.add(clonedStreamEvent);
        if (state.resetEvent == null) {
            state.resetEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
            state.resetEvent.setType(ComplexEvent.Type.RESET);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, WindowState state, SiddhiQueryContext siddhiQueryContext) {
        return OperatorParser.constructOperator(state.expiredEventChunk, condition, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);
    }

    @Override
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition, StreamEventCloner streamEventCloner, WindowState state) {
        return ((Operator)compiledCondition).find(matchingEvent, state.expiredEventChunk, streamEventCloner);
    }

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    class WindowState
    extends State {
        private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk(false);
        private ComplexEventChunk<StreamEvent> expiredEventChunk = null;
        private StreamEvent resetEvent = null;
        private long endTime = -1L;
        private long startTime = 0L;
        private long lastScheduledTime;
        private long lastCurrentEventTime;
        private boolean flushed = false;

        public WindowState(boolean outputExpectsExpiredEvents, long schedulerTimeout, long startTime) {
            this.startTime = startTime;
            if (outputExpectsExpiredEvents || ExternalTimeBatchWindowProcessor.this.findToBeExecuted) {
                this.expiredEventChunk = new ComplexEventChunk(false);
            }
            if (schedulerTimeout > 0L && this.expiredEventChunk == null) {
                this.expiredEventChunk = new ComplexEventChunk(false);
            }
        }

        @Override
        public boolean canDestroy() {
            return this.currentEventChunk.getFirst() == null && (this.expiredEventChunk == null || this.expiredEventChunk.getFirst() == null) && this.resetEvent == null && this.flushed;
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("StartTime", this.startTime);
            state.put("EndTime", this.endTime);
            state.put("LastScheduledTime", this.lastScheduledTime);
            state.put("LastCurrentEventTime", this.lastCurrentEventTime);
            state.put("CurrentEventChunk", this.currentEventChunk.getFirst());
            state.put("ExpiredEventChunk", this.expiredEventChunk != null ? this.expiredEventChunk.getFirst() : null);
            state.put("ResetEvent", this.resetEvent);
            state.put("Flushed", this.flushed);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.startTime = (Long)state.get("StartTime");
            this.endTime = (Long)state.get("EndTime");
            this.lastScheduledTime = (Long)state.get("LastScheduledTime");
            this.lastCurrentEventTime = (Long)state.get("LastCurrentEventTime");
            this.currentEventChunk.clear();
            this.currentEventChunk.add((StreamEvent)state.get("CurrentEventChunk"));
            if (this.expiredEventChunk != null) {
                this.expiredEventChunk.clear();
                this.expiredEventChunk.add((StreamEvent)state.get("ExpiredEventChunk"));
            } else {
                if (ExternalTimeBatchWindowProcessor.this.outputExpectsExpiredEvents || ExternalTimeBatchWindowProcessor.this.findToBeExecuted) {
                    this.expiredEventChunk = new ComplexEventChunk(false);
                }
                if (ExternalTimeBatchWindowProcessor.this.schedulerTimeout > 0L) {
                    this.expiredEventChunk = new ComplexEventChunk(false);
                }
            }
            this.resetEvent = (StreamEvent)state.get("ResetEvent");
            this.flushed = (Boolean)state.get("Flushed");
        }
    }
}

