/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation;

import io.siddhi.core.aggregation.BaseIncrementalValueStore;
import io.siddhi.core.aggregation.IncrementalExecutor;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.parser.AggregationParser;
import io.siddhi.core.util.snapshot.state.PartitionSyncStateHolder;
import io.siddhi.core.util.snapshot.state.SingleSyncStateHolder;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.query.api.aggregation.TimePeriod;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

public class IncrementalDataAggregator {
    private final List<TimePeriod.Duration> incrementalDurations;
    private final TimePeriod.Duration durationToAggregate;
    private final List<ExpressionExecutor> baseExecutorsForFind;
    private final StateHolder valueStateHolder;
    private final StreamEvent resetEvent;
    private final long oldestEventTimestamp;
    private ExpressionExecutor shouldUpdateTimestamp;
    private final StreamEventFactory streamEventFactory;

    public IncrementalDataAggregator(List<TimePeriod.Duration> incrementalDurations, TimePeriod.Duration durationToAggregate, long oldestEventTimestamp, List<ExpressionExecutor> baseExecutorsForFind, MetaStreamEvent metaStreamEvent, ExpressionExecutor shouldUpdateTimestamp, boolean groupBy) {
        this.incrementalDurations = incrementalDurations;
        this.durationToAggregate = durationToAggregate;
        this.oldestEventTimestamp = oldestEventTimestamp;
        this.baseExecutorsForFind = baseExecutorsForFind;
        this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
        this.valueStateHolder = groupBy ? new PartitionSyncStateHolder(() -> new ValueState()) : new SingleSyncStateHolder(() -> new ValueState());
        this.resetEvent = AggregationParser.createRestEvent(metaStreamEvent, this.streamEventFactory.newInstance());
        this.shouldUpdateTimestamp = shouldUpdateTimestamp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ComplexEventChunk<StreamEvent> aggregateInMemoryData(Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap) {
        int startIndex = this.incrementalDurations.indexOf(this.durationToAggregate);
        HashSet<String> groupByKeys = new HashSet<String>();
        for (int k = startIndex; k >= 0; --k) {
            TimePeriod.Duration duration = this.incrementalDurations.get(k);
            IncrementalExecutor incrementalExecutor = incrementalExecutorMap.get(duration);
            BaseIncrementalValueStore aBaseIncrementalValueStore = incrementalExecutor.getBaseIncrementalValueStore();
            Map<String, StreamEvent> groupedByEvents = aBaseIncrementalValueStore.getGroupedByEvents();
            for (Map.Entry<String, StreamEvent> eventEntry : groupedByEvents.entrySet()) {
                long startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(eventEntry.getValue().getTimestamp(), this.durationToAggregate);
                String groupByKey = eventEntry.getKey() + "-" + startTimeOfAggregates;
                IncrementalDataAggregator incrementalDataAggregator = this;
                synchronized (incrementalDataAggregator) {
                    groupByKeys.add(groupByKey);
                    SiddhiAppContext.startGroupByFlow(groupByKey);
                    ValueState state = (ValueState)this.valueStateHolder.getState();
                    try {
                        boolean shouldUpdate = true;
                        if (this.shouldUpdateTimestamp != null) {
                            shouldUpdate = (Boolean)this.shouldUpdate(this.shouldUpdateTimestamp.execute(eventEntry.getValue()), state);
                        } else {
                            state.lastTimestamp = this.oldestEventTimestamp;
                        }
                        for (int i = 0; i < this.baseExecutorsForFind.size(); ++i) {
                            ExpressionExecutor expressionExecutor;
                            if (shouldUpdate) {
                                expressionExecutor = this.baseExecutorsForFind.get(i);
                                state.setValue(expressionExecutor.execute(eventEntry.getValue()), i + 1);
                                continue;
                            }
                            expressionExecutor = this.baseExecutorsForFind.get(i);
                            if (expressionExecutor instanceof VariableExpressionExecutor) continue;
                            state.setValue(expressionExecutor.execute(eventEntry.getValue()), i + 1);
                        }
                    }
                    finally {
                        this.valueStateHolder.returnState(state);
                        SiddhiAppContext.stopGroupByFlow();
                    }
                }
            }
        }
        for (String groupByKey : groupByKeys) {
            SiddhiAppContext.startGroupByFlow(groupByKey);
            try {
                for (ExpressionExecutor expressionExecutor : this.baseExecutorsForFind) {
                    expressionExecutor.execute(this.resetEvent);
                }
            }
            finally {
                SiddhiAppContext.stopGroupByFlow();
            }
        }
        return this.getProcessedEventChunk();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ComplexEventChunk<StreamEvent> getProcessedEventChunk() {
        ComplexEventChunk<StreamEvent> streamEventChunk = new ComplexEventChunk<StreamEvent>(true);
        Map valueStoreMap = this.valueStateHolder.getAllGroupByStates();
        try {
            for (State aState : valueStoreMap.values()) {
                ValueState state = (ValueState)aState;
                StreamEvent streamEvent = this.streamEventFactory.newInstance();
                long timestamp = state.lastTimestamp;
                streamEvent.setTimestamp(timestamp);
                state.setValue(timestamp, 0);
                streamEvent.setOutputData(state.values);
                streamEventChunk.add(streamEvent);
            }
        }
        finally {
            this.valueStateHolder.returnGroupByStates(valueStoreMap);
        }
        return streamEventChunk;
    }

    private Object shouldUpdate(Object data, ValueState state) {
        long timestamp = (Long)data;
        if (timestamp >= state.lastTimestamp) {
            state.lastTimestamp = timestamp;
            return true;
        }
        return false;
    }

    class ValueState
    extends State {
        private Object[] values;
        public long lastTimestamp = 0L;

        public ValueState() {
            this.values = new Object[IncrementalDataAggregator.this.baseExecutorsForFind.size() + 1];
        }

        @Override
        public boolean canDestroy() {
            return this.values == null && this.lastTimestamp == 0L;
        }

        public void setValue(Object value, int position) {
            this.values[position] = value;
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("Values", this.values);
            state.put("LastTimestamp", this.lastTimestamp);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.values = (Object[])state.get("Values");
            this.lastTimestamp = (Long)state.get("LastTimestamp");
        }
    }
}

