/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.processor.stream.window;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.BatchingFindableWindowProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.SnapshotStateList;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="batch", namespace="", description="A window that holds an incoming events batch. When a new set of events arrives, the previously arrived old events will be expired. Batch window can be used to aggregate events that comes in batches. If it has the parameter length specified, then batch window process the batch as several chunks.", parameters={@Parameter(name="window.length", description="The length of a chunk", type={DataType.INT}, optional=true, defaultValue="If length value was not given it assign 0 as length and process the whole batch as once")}, parameterOverloads={@ParameterOverload, @ParameterOverload(parameterNames={"window.length"})}, examples={@Example(syntax="define stream consumerItemStream (itemId string, price float)\n\nfrom consumerItemStream#window.batch()\nselect price, str:groupConcat(itemId) as itemIds\ngroup by price\ninsert into outputStream;", description="This will output comma separated items IDs that have the same price for each incoming batch of events.")})
public class BatchWindowProcessor
extends BatchingFindableWindowProcessor<WindowState> {
    private int length = 0;
    private boolean outputExpectsExpiredEvents;
    private boolean findToBeExecuted;
    private SiddhiQueryContext siddhiQueryContext;

    @Override
    protected StateFactory<WindowState> init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
        this.findToBeExecuted = findToBeExecuted;
        this.siddhiQueryContext = siddhiQueryContext;
        if (attributeExpressionExecutors.length == 1) {
            this.length = (Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue();
        } else if (attributeExpressionExecutors.length == 0) {
            this.length = 0;
        } else {
            throw new SiddhiAppValidationException("Batch window should have at most one parameter (<int> chunkLength), but found " + attributeExpressionExecutors.length + " input attributes");
        }
        if (this.length < 0) {
            throw new SiddhiAppValidationException("Batch window should have at most one parameter (<int> chunkLength) greater than zero. But found value 'chunkLength = " + this.length + " ' ");
        }
        return () -> new WindowState(streamEventClonerHolder, outputExpectsExpiredEvents, findToBeExecuted);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, WindowState state) {
        ArrayList streamEventChunks = new ArrayList();
        ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<StreamEvent>();
        WindowState windowState = state;
        synchronized (windowState) {
            long l = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
            if (this.outputExpectsExpiredEvents) {
                if (state.expiredEventQueue.getFirst() != null) {
                    while (state.expiredEventQueue.hasNext()) {
                        state.expiredEventQueue.next().setTimestamp(l);
                    }
                    currentEventChunk.add(state.expiredEventQueue.getFirst());
                    if (state.resetEvent != null) {
                        currentEventChunk.add(state.resetEvent);
                        state.resetEvent = null;
                    }
                }
                state.expiredEventQueue.clear();
            }
            ComplexEventChunk<StreamEvent> currentEventQueue = new ComplexEventChunk<StreamEvent>();
            int count = 0;
            if (streamEventChunk.hasNext()) {
                do {
                    StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                    StreamEvent clonedStreamEventToProcess = streamEventCloner.copyStreamEvent(streamEvent);
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent clonedStreamEventToExpire = streamEventCloner.copyStreamEvent(streamEvent);
                        clonedStreamEventToExpire.setType(ComplexEvent.Type.EXPIRED);
                        state.expiredEventQueue.add(clonedStreamEventToExpire);
                    }
                    currentEventQueue.add(clonedStreamEventToProcess);
                    if (++count != this.length) continue;
                    if (currentEventQueue.getFirst() != null) {
                        if (state.resetEvent != null) {
                            currentEventChunk.add(state.resetEvent);
                        }
                        state.resetEvent = streamEventCloner.copyStreamEvent((StreamEvent)currentEventQueue.getFirst());
                        state.resetEvent.setType(ComplexEvent.Type.RESET);
                        currentEventChunk.add((StreamEvent)currentEventQueue.getFirst());
                    }
                    count = 0;
                    currentEventQueue.clear();
                    if (currentEventChunk.getFirst() == null) continue;
                    streamEventChunks.add(currentEventChunk);
                    currentEventChunk = new ComplexEventChunk();
                } while (streamEventChunk.hasNext());
                if (currentEventQueue.getFirst() != null) {
                    if (state.resetEvent != null) {
                        currentEventChunk.add(state.resetEvent);
                    }
                    state.resetEvent = streamEventCloner.copyStreamEvent((StreamEvent)currentEventQueue.getFirst());
                    state.resetEvent.setType(ComplexEvent.Type.RESET);
                    currentEventChunk.add((StreamEvent)currentEventQueue.getFirst());
                }
                if (currentEventChunk.getFirst() != null) {
                    streamEventChunks.add(currentEventChunk);
                }
            }
        }
        for (ComplexEventChunk complexEventChunk : streamEventChunks) {
            nextProcessor.process(complexEventChunk);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    @Override
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition, StreamEventCloner streamEventCloner, WindowState state) {
        return ((Operator)compiledCondition).find(matchingEvent, state.expiredEventQueue, streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, WindowState state, SiddhiQueryContext siddhiQueryContext) {
        return OperatorParser.constructOperator(state.expiredEventQueue, condition, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);
    }

    class WindowState
    extends State {
        private SnapshotableStreamEventQueue expiredEventQueue;
        private StreamEvent resetEvent = null;

        WindowState(StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted) {
            if (outputExpectsExpiredEvents || findToBeExecuted) {
                this.expiredEventQueue = new SnapshotableStreamEventQueue(streamEventClonerHolder);
            }
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            if (BatchWindowProcessor.this.outputExpectsExpiredEvents || BatchWindowProcessor.this.findToBeExecuted) {
                state.put("ExpiredEventQueue", this.expiredEventQueue.getSnapshot());
            }
            state.put("ResetEvent", this.resetEvent);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            if (BatchWindowProcessor.this.outputExpectsExpiredEvents || BatchWindowProcessor.this.findToBeExecuted) {
                this.expiredEventQueue.clear();
                this.expiredEventQueue.restore((SnapshotStateList)state.get("ExpiredEventQueue"));
            }
            this.resetEvent = (StreamEvent)state.get("ResetEvent");
        }

        @Override
        public boolean canDestroy() {
            return this.expiredEventQueue.getFirst() == null && this.resetEvent == null;
        }
    }
}

