/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation.persistedaggregation;

import io.siddhi.core.aggregation.Executor;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.query.api.aggregation.TimePeriod;
import java.sql.SQLException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

public class PersistedIncrementalExecutor
implements Executor {
    private static final Logger log = Logger.getLogger(PersistedIncrementalExecutor.class);
    private final ExpressionExecutor timestampExpressionExecutor;
    private final StateHolder<ExecutorState> stateHolder;
    private TimePeriod.Duration duration;
    private Executor next;
    private StreamEventFactory streamEventFactory;
    private String timeZone;
    private Processor cudStreamProcessor;
    private boolean isProcessingExecutor;

    public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration duration, List<ExpressionExecutor> processExpressionExecutors, Executor child, SiddhiQueryContext siddhiQueryContext, MetaStreamEvent metaStreamEvent, String timeZone, Processor cudStreamProcessor) {
        this.timeZone = timeZone;
        this.duration = duration;
        this.next = child;
        this.cudStreamProcessor = cudStreamProcessor;
        this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
        this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
        this.setNextExecutor(child);
        this.stateHolder = siddhiQueryContext.generateStateHolder(aggregatorName + "-" + this.getClass().getName(), false, () -> new ExecutorState());
        this.isProcessingExecutor = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute(ComplexEventChunk streamEventChunk) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Event Chunk received by " + this.duration + " incremental executor: " + streamEventChunk.toString() + " will be dropped since persisted aggregation has been scheduled "));
        }
        streamEventChunk.reset();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            streamEventChunk.remove();
            ExecutorState executorState = this.stateHolder.getState();
            try {
                long timestamp = this.getTimestamp(streamEvent);
                if (timestamp < executorState.nextEmitTime) continue;
                long emittedTime = executorState.nextEmitTime;
                long startedTime = executorState.startTimeOfAggregates;
                executorState.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, this.timeZone);
                executorState.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, this.timeZone);
                this.dispatchAggregateEvents(startedTime, emittedTime, this.timeZone);
                this.sendTimerEvent(executorState);
            }
            finally {
                this.stateHolder.returnState(executorState);
            }
        }
    }

    private void dispatchAggregateEvents(long startTimeOfNewAggregates, long emittedTime, String timeZone) {
        if (emittedTime != -1L) {
            this.dispatchEvent(startTimeOfNewAggregates, emittedTime, timeZone);
        }
    }

    private void dispatchEvent(long startTimeOfNewAggregates, long emittedTime, String timeZone) {
        ZonedDateTime startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTimeOfNewAggregates), ZoneId.of(timeZone));
        ZonedDateTime endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(emittedTime), ZoneId.of(timeZone));
        log.info((Object)("Aggregation event dispatched for the duration " + this.duration + " to aggregate data from " + startTime.toString() + " to " + endTime.toString() + " "));
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<StreamEvent>();
        StreamEvent streamEvent = this.streamEventFactory.newInstance();
        streamEvent.setType(ComplexEvent.Type.CURRENT);
        streamEvent.setTimestamp(emittedTime);
        ArrayList<Long> outputDataList = new ArrayList<Long>();
        outputDataList.add(startTimeOfNewAggregates);
        outputDataList.add(emittedTime);
        outputDataList.add(null);
        streamEvent.setOutputData(outputDataList.toArray());
        if (this.isProcessingExecutor) {
            complexEventChunk.add(streamEvent);
            int i = 0;
            while (true) {
                ++i;
                try {
                    this.cudStreamProcessor.process(complexEventChunk);
                    return;
                }
                catch (Exception e) {
                    if (e.getCause() instanceof SQLException) {
                        if (e.getCause().getLocalizedMessage().contains("try restarting transaction") && i < 3) {
                            log.error((Object)("Error occurred while executing the aggregation for data between " + startTimeOfNewAggregates + " - " + emittedTime + " for duration " + this.duration + " Retrying the transaction attempt " + (i - 1)), (Throwable)e);
                            try {
                                Thread.sleep(3000L);
                            }
                            catch (InterruptedException interruptedException) {
                                log.error((Object)("Thread sleep interrupted while waiting to re-execute the aggregation query for duration " + this.duration), (Throwable)interruptedException);
                            }
                            continue;
                        }
                        log.error((Object)("Error occurred while executing the aggregation for data between " + startTimeOfNewAggregates + " - " + emittedTime + " for duration " + this.duration + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n"), (Throwable)e);
                    } else {
                        log.error((Object)("Error occurred while executing the aggregation for data between " + startTimeOfNewAggregates + " - " + emittedTime + " for duration \n" + this.duration), (Throwable)e);
                    }
                    return;
                }
                break;
            }
        }
        if (this.getNextExecutor() != null) {
            this.next.execute(complexEventChunk);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setEmitTime(long emitTimeOfLatestEventInTable) {
        ExecutorState state = this.stateHolder.getState();
        try {
            state.nextEmitTime = emitTimeOfLatestEventInTable;
        }
        finally {
            this.stateHolder.returnState(state);
        }
    }

    public void setProcessingExecutor(boolean processingExecutor) {
        this.isProcessingExecutor = processingExecutor;
    }

    private void sendTimerEvent(ExecutorState executorState) {
        if (this.getNextExecutor() != null) {
            StreamEvent timerEvent = this.streamEventFactory.newInstance();
            timerEvent.setType(ComplexEvent.Type.TIMER);
            timerEvent.setTimestamp(executorState.startTimeOfAggregates);
            ComplexEventChunk<StreamEvent> timerStreamEventChunk = new ComplexEventChunk<StreamEvent>();
            timerStreamEventChunk.add(timerEvent);
            this.next.execute(timerStreamEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent) {
        long timestamp = streamEvent.getType() == ComplexEvent.Type.CURRENT ? ((Long)this.timestampExpressionExecutor.execute(streamEvent)).longValue() : streamEvent.getTimestamp();
        return timestamp;
    }

    @Override
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override
    public void setNextExecutor(Executor executor) {
        this.next = executor;
    }

    class ExecutorState
    extends State {
        private long nextEmitTime = -1L;
        private long startTimeOfAggregates = -1L;
        private boolean timerStarted = false;
        private boolean canDestroy = false;

        ExecutorState() {
        }

        @Override
        public boolean canDestroy() {
            return this.canDestroy;
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("NextEmitTime", this.nextEmitTime);
            state.put("StartTimeOfAggregates", this.startTimeOfAggregates);
            state.put("TimerStarted", this.timerStarted);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.nextEmitTime = (Long)state.get("NextEmitTime");
            this.startTimeOfAggregates = (Long)state.get("StartTimeOfAggregates");
            this.timerStarted = (Boolean)state.get("TimerStarted");
        }

        public void setCanDestroy(boolean canDestroy) {
            this.canDestroy = canDestroy;
        }
    }
}

