/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.processor.stream.window;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.BatchingWindowProcessor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.state.SnapshotStateList;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.Expression;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name="batch", namespace="", description="A window that holds an incoming events batch. When a new set of events arrives, the previously arrived old events will be expired. Batch window can be used to aggregate events that comes in batches. If it has the parameter length specified, then batch window process the batch as several chunks.", parameters={@Parameter(name="window.length", description="The length of a chunk", type={DataType.INT}, optional=true, defaultValue="If length value was not given it assign 0 as length and process the whole batch as once")}, examples={@Example(syntax="define stream consumerItemStream (itemId string, price float)\n\nfrom consumerItemStream#window.batch()\nselect price, str:groupConcat(itemId) as itemIds\ngroup by price\ninsert into outputStream;", description="This will output comma separated items IDs that have the same price for each incoming batch of events.")})
public class BatchWindowProcessor
extends BatchingWindowProcessor
implements FindableProcessor {
    private int length = 0;
    private int count = 0;
    private SnapshotableStreamEventQueue expiredEventQueue = null;
    private SnapshotableStreamEventQueue currentEventQueue;
    private boolean outputExpectsExpiredEvents;
    private SiddhiQueryContext siddhiQueryContext;
    private StreamEvent resetEvent = null;

    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean outputExpectsExpiredEvents, SiddhiQueryContext siddhiQueryContext) {
        this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
        this.siddhiQueryContext = siddhiQueryContext;
        this.currentEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        if (outputExpectsExpiredEvents) {
            this.expiredEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        }
        if (attributeExpressionExecutors.length == 1) {
            this.length = (Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue();
        } else if (attributeExpressionExecutors.length == 0) {
            this.length = 0;
        } else {
            throw new SiddhiAppValidationException("Batch window should have at most one parameter (<int> chunkLength), but found " + attributeExpressionExecutors.length + " input attributes");
        }
        if (this.length < 0) {
            throw new SiddhiAppValidationException("Batch window should have at most one parameter (<int> chunkLength) greater than zero. But found value 'chunkLength = " + this.length + " ' ");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        ArrayList streamEventChunks = new ArrayList();
        ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<StreamEvent>(true);
        BatchWindowProcessor batchWindowProcessor = this;
        synchronized (batchWindowProcessor) {
            long l = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
            if (this.outputExpectsExpiredEvents) {
                if (this.expiredEventQueue.getFirst() != null) {
                    while (this.expiredEventQueue.hasNext()) {
                        this.expiredEventQueue.next().setTimestamp(l);
                    }
                    currentEventChunk.add(this.expiredEventQueue.getFirst());
                    if (this.resetEvent != null) {
                        currentEventChunk.add(this.resetEvent);
                        this.resetEvent = null;
                    }
                }
                this.expiredEventQueue.clear();
            }
            if (streamEventChunk.hasNext()) {
                do {
                    StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                    StreamEvent clonedStreamEventToProcess = streamEventCloner.copyStreamEvent(streamEvent);
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent clonedStreamEventToExpire = streamEventCloner.copyStreamEvent(streamEvent);
                        clonedStreamEventToExpire.setType(ComplexEvent.Type.EXPIRED);
                        this.expiredEventQueue.add(clonedStreamEventToExpire);
                    }
                    this.currentEventQueue.add(clonedStreamEventToProcess);
                    ++this.count;
                    if (this.count != this.length) continue;
                    if (this.currentEventQueue.getFirst() != null) {
                        if (this.resetEvent != null) {
                            currentEventChunk.add(this.resetEvent);
                        }
                        this.resetEvent = streamEventCloner.copyStreamEvent(this.currentEventQueue.getFirst());
                        this.resetEvent.setType(ComplexEvent.Type.RESET);
                        currentEventChunk.add(this.currentEventQueue.getFirst());
                    }
                    this.count = 0;
                    this.currentEventQueue.clear();
                    if (currentEventChunk.getFirst() == null) continue;
                    streamEventChunks.add(currentEventChunk);
                    currentEventChunk = new ComplexEventChunk(true);
                } while (streamEventChunk.hasNext());
                if (this.currentEventQueue.getFirst() != null) {
                    if (this.resetEvent != null) {
                        currentEventChunk.add(this.resetEvent);
                    }
                    this.resetEvent = streamEventCloner.copyStreamEvent(this.currentEventQueue.getFirst());
                    this.resetEvent.setType(ComplexEvent.Type.RESET);
                    currentEventChunk.add(this.currentEventQueue.getFirst());
                }
                this.count = 0;
                this.currentEventQueue.clear();
                if (currentEventChunk.getFirst() != null) {
                    streamEventChunks.add(currentEventChunk);
                }
            }
        }
        for (ComplexEventChunk complexEventChunk : streamEventChunks) {
            nextProcessor.process(complexEventChunk);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        BatchWindowProcessor batchWindowProcessor = this;
        synchronized (batchWindowProcessor) {
            if (this.outputExpectsExpiredEvents) {
                state.put("ExpiredEventQueue", this.expiredEventQueue.getSnapshot());
            }
            state.put("ResetEvent", this.resetEvent);
        }
        return state;
    }

    @Override
    public synchronized void restoreState(Map<String, Object> state) {
        if (this.outputExpectsExpiredEvents) {
            this.expiredEventQueue.clear();
            this.expiredEventQueue.restore((SnapshotStateList)state.get("ExpiredEventQueue"));
        }
        this.resetEvent = (StreamEvent)state.get("ResetEvent");
    }

    @Override
    public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        return ((Operator)compiledCondition).find(matchingEvent, this.expiredEventQueue, this.streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        if (this.expiredEventQueue == null) {
            this.expiredEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        }
        return OperatorParser.constructOperator(this.expiredEventQueue, condition, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);
    }
}

