/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation;

import io.siddhi.core.aggregation.BaseIncrementalValueStore;
import io.siddhi.core.aggregation.Executor;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.selector.GroupByKeyGenerator;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.parser.AggregationParser;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.query.api.aggregation.TimePeriod;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public class IncrementalExecutor
implements Executor {
    private static final Logger LOG = Logger.getLogger(IncrementalExecutor.class);
    private final String aggregatorName;
    private final StreamEvent resetEvent;
    private final ExpressionExecutor timestampExpressionExecutor;
    private final StateHolder<ExecutorState> stateHolder;
    private final String siddhiAppName;
    private final Lock lock = new ReentrantLock();
    boolean waitUntillprocessFinish = false;
    private TimePeriod.Duration duration;
    private Table table;
    private boolean isRoot;
    private boolean isProcessingExecutor;
    private Executor next;
    private GroupByKeyGenerator groupByKeyGenerator;
    private StreamEventFactory streamEventFactory;
    private Scheduler scheduler;
    private ExecutorService executorService;
    private String timeZone;
    private BaseIncrementalValueStore baseIncrementalValueStore;

    public IncrementalExecutor(String aggregatorName, TimePeriod.Duration duration, List<ExpressionExecutor> processExpressionExecutors, ExpressionExecutor shouldUpdateTimestamp, GroupByKeyGenerator groupByKeyGenerator, boolean isRoot, Table table, Executor child, SiddhiQueryContext siddhiQueryContext, MetaStreamEvent metaStreamEvent, String timeZone, boolean waitUntillprocessFinish) {
        this.timeZone = timeZone;
        this.aggregatorName = aggregatorName;
        this.duration = duration;
        this.isRoot = isRoot;
        this.table = table;
        this.next = child;
        this.waitUntillprocessFinish = waitUntillprocessFinish;
        this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
        this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
        this.groupByKeyGenerator = groupByKeyGenerator;
        this.baseIncrementalValueStore = new BaseIncrementalValueStore(aggregatorName, -1L, processExpressionExecutors, shouldUpdateTimestamp, this.streamEventFactory, siddhiQueryContext, true, false);
        this.resetEvent = AggregationParser.createRestEvent(metaStreamEvent, this.streamEventFactory.newInstance());
        this.setNextExecutor(child);
        this.siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
        this.stateHolder = siddhiQueryContext.generateStateHolder(aggregatorName + "-" + this.getClass().getName(), false, () -> new ExecutorState());
        this.executorService = Executors.newSingleThreadExecutor();
        this.isProcessingExecutor = false;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void execute(ComplexEventChunk streamEventChunk) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Event Chunk received by " + this.duration + " incremental executor: " + streamEventChunk.toString()));
        }
        streamEventChunk.reset();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            streamEventChunk.remove();
            ExecutorState executorState = this.stateHolder.getState();
            try {
                long timestamp = this.getTimestamp(streamEvent, executorState);
                long startTime = executorState.startTimeOfAggregates;
                executorState.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, this.timeZone);
                if (timestamp >= executorState.nextEmitTime) {
                    executorState.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, this.timeZone);
                    this.dispatchAggregateEvents(executorState.startTimeOfAggregates);
                    this.sendTimerEvent(executorState);
                }
                if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                this.processAggregates(streamEvent, executorState);
            }
            finally {
                this.stateHolder.returnState(executorState);
            }
        }
    }

    private void sendTimerEvent(ExecutorState executorState) {
        if (this.getNextExecutor() != null) {
            StreamEvent timerEvent = this.streamEventFactory.newInstance();
            timerEvent.setType(ComplexEvent.Type.TIMER);
            timerEvent.setTimestamp(executorState.startTimeOfAggregates);
            ComplexEventChunk<StreamEvent> timerStreamEventChunk = new ComplexEventChunk<StreamEvent>();
            timerStreamEventChunk.add(timerEvent);
            this.next.execute(timerStreamEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent, ExecutorState executorState) {
        long timestamp;
        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
            timestamp = (Long)this.timestampExpressionExecutor.execute(streamEvent);
            if (this.isRoot && !executorState.timerStarted) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, this.timeZone));
                executorState.timerStarted = true;
            }
        } else {
            timestamp = streamEvent.getTimestamp();
            if (this.isRoot) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, this.timeZone));
            }
        }
        return timestamp;
    }

    @Override
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override
    public void setNextExecutor(Executor nextExecutor) {
        this.next = nextExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAggregates(StreamEvent streamEvent, ExecutorState executorState) {
        IncrementalExecutor incrementalExecutor = this;
        synchronized (incrementalExecutor) {
            if (this.groupByKeyGenerator != null) {
                try {
                    String groupedByKey = this.groupByKeyGenerator.constructEventKey(streamEvent);
                    SiddhiAppContext.startGroupByFlow(groupedByKey);
                    this.baseIncrementalValueStore.process(streamEvent);
                }
                finally {
                    SiddhiAppContext.stopGroupByFlow();
                }
            } else {
                this.baseIncrementalValueStore.process(streamEvent);
            }
        }
    }

    private void dispatchAggregateEvents(long startTimeOfNewAggregates) {
        this.dispatchEvent(startTimeOfNewAggregates, this.baseIncrementalValueStore);
    }

    private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueStore aBaseIncrementalValueStore) {
        AtomicBoolean isProcessFinished = new AtomicBoolean(false);
        if (aBaseIncrementalValueStore.isProcessed()) {
            Map<String, StreamEvent> streamEventMap = aBaseIncrementalValueStore.getGroupedByEvents();
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>();
            for (StreamEvent event : streamEventMap.values()) {
                eventChunk.add(event);
            }
            Map<String, StreamEvent> tableStreamEventMap = aBaseIncrementalValueStore.getGroupedByEvents();
            ComplexEventChunk<StreamEvent> tableEventChunk = new ComplexEventChunk<StreamEvent>();
            for (StreamEvent event : tableStreamEventMap.values()) {
                tableEventChunk.add(event);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            }
            if (this.isProcessingExecutor) {
                try {
                    this.executorService.execute(() -> {
                        this.table.addEvents(tableEventChunk, streamEventMap.size());
                        isProcessFinished.set(true);
                    });
                }
                catch (Throwable t) {
                    LOG.error((Object)("Exception occurred at siddhi app '" + this.siddhiAppName + "' when performing table writes of aggregation '" + this.aggregatorName + "' for duration '" + this.duration + "'. This should be investigated as this can cause accuracy loss."), t);
                }
            }
            if (this.waitUntillprocessFinish) {
                try {
                    while (!isProcessFinished.get()) {
                        Thread.sleep(1000L);
                    }
                }
                catch (InterruptedException e) {
                    LOG.error((Object)("Error occurred while waiting until table update task finishes for duration " + this.duration), (Throwable)e);
                }
            }
            if (this.getNextExecutor() != null) {
                this.next.execute(eventChunk);
            }
        }
        this.cleanBaseIncrementalValueStore(startTimeOfNewAggregates, aBaseIncrementalValueStore);
    }

    private void cleanBaseIncrementalValueStore(long startTimeOfNewAggregates, BaseIncrementalValueStore baseIncrementalValueStore) {
        baseIncrementalValueStore.clearValues(startTimeOfNewAggregates, this.resetEvent);
        for (ExpressionExecutor expressionExecutor : baseIncrementalValueStore.getExpressionExecutors()) {
            expressionExecutor.execute(this.resetEvent);
        }
    }

    BaseIncrementalValueStore getBaseIncrementalValueStore() {
        return this.baseIncrementalValueStore;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getAggregationStartTimestamp() {
        ExecutorState state = this.stateHolder.getState();
        try {
            long l = state.startTimeOfAggregates;
            return l;
        }
        finally {
            this.stateHolder.returnState(state);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setEmitTime(long emitTimeOfLatestEventInTable) {
        ExecutorState state = this.stateHolder.getState();
        try {
            state.nextEmitTime = emitTimeOfLatestEventInTable;
        }
        finally {
            this.stateHolder.returnState(state);
        }
    }

    public void setProcessingExecutor(boolean processingExecutor) {
        this.isProcessingExecutor = processingExecutor;
    }

    class ExecutorState
    extends State {
        private long nextEmitTime = -1L;
        private long startTimeOfAggregates = -1L;
        private boolean timerStarted = false;
        private boolean canDestroy = false;

        ExecutorState() {
        }

        @Override
        public boolean canDestroy() {
            return this.canDestroy;
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("NextEmitTime", this.nextEmitTime);
            state.put("StartTimeOfAggregates", this.startTimeOfAggregates);
            state.put("TimerStarted", this.timerStarted);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.nextEmitTime = (Long)state.get("NextEmitTime");
            this.startTimeOfAggregates = (Long)state.get("StartTimeOfAggregates");
            this.timerStarted = (Boolean)state.get("TimerStarted");
        }

        public void setCanDestroy(boolean canDestroy) {
            this.canDestroy = canDestroy;
        }
    }
}

