/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation;

import io.siddhi.core.aggregation.IncrementalDataPurging;
import io.siddhi.core.aggregation.IncrementalExecutor;
import io.siddhi.core.aggregation.RecreateInMemoryData;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.MetaStateEvent;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.selector.GroupByKeyGenerator;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.IncrementalAggregateCompileCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.parser.ExpressionParser;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.core.util.snapshot.SnapshotService;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.MemoryCalculable;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.aggregation.Within;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.expression.AttributeFunction;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AggregationRuntime
implements MemoryCalculable {
    private final AggregationDefinition aggregationDefinition;
    private final Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap;
    private final Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions;
    private final List<ExpressionExecutor> baseExecutorsForFind;
    private ExpressionExecutor shouldUpdateTimestamp;
    private final Map<TimePeriod.Duration, Table> aggregationTables;
    private final MetaStreamEvent tableMetaStreamEvent;
    private final MetaStreamEvent aggregateMetaSteamEvent;
    private final LatencyTracker latencyTrackerFind;
    private final ThroughputTracker throughputTrackerFind;
    private final List<GroupByKeyGenerator> groupByKeyGeneratorList;
    private List<TimePeriod.Duration> incrementalDurations;
    private SingleStreamRuntime singleStreamRuntime;
    private List<ExpressionExecutor> outputExpressionExecutors;
    private RecreateInMemoryData recreateInMemoryData;
    private boolean processingOnExternalTime;
    private boolean isFirstEventArrived;
    private long lastExecutorsRefreshedTime = -1L;
    private IncrementalDataPurging incrementalDataPurging;
    private String shardId;
    private List<List<ExpressionExecutor>> aggregateProcessExpressionExecutorsListForFind;

    public AggregationRuntime(AggregationDefinition aggregationDefinition, Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap, Map<TimePeriod.Duration, Table> aggregationTables, SingleStreamRuntime singleStreamRuntime, List<TimePeriod.Duration> incrementalDurations, MetaStreamEvent tableMetaStreamEvent, List<ExpressionExecutor> outputExpressionExecutors, LatencyTracker latencyTrackerFind, ThroughputTracker throughputTrackerFind, RecreateInMemoryData recreateInMemoryData, boolean processingOnExternalTime, List<GroupByKeyGenerator> groupByKeyGeneratorList, IncrementalDataPurging incrementalDataPurging, String shardId, Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions, ExpressionExecutor shouldUpdateTimestamp, List<List<ExpressionExecutor>> aggregateProcessExpressionExecutorsListForFind) {
        this.aggregationDefinition = aggregationDefinition;
        this.incrementalExecutorMap = incrementalExecutorMap;
        this.aggregationTables = aggregationTables;
        this.incrementalDurations = incrementalDurations;
        this.singleStreamRuntime = singleStreamRuntime;
        this.tableMetaStreamEvent = tableMetaStreamEvent;
        this.outputExpressionExecutors = outputExpressionExecutors;
        this.latencyTrackerFind = latencyTrackerFind;
        this.throughputTrackerFind = throughputTrackerFind;
        this.recreateInMemoryData = recreateInMemoryData;
        this.processingOnExternalTime = processingOnExternalTime;
        this.groupByKeyGeneratorList = groupByKeyGeneratorList;
        this.incrementalDataPurging = incrementalDataPurging;
        this.shardId = shardId;
        this.incrementalExecutorMapForPartitions = incrementalExecutorMapForPartitions;
        this.shouldUpdateTimestamp = shouldUpdateTimestamp;
        this.aggregateProcessExpressionExecutorsListForFind = aggregateProcessExpressionExecutorsListForFind;
        this.aggregateMetaSteamEvent = new MetaStreamEvent();
        this.baseExecutorsForFind = aggregateProcessExpressionExecutorsListForFind.get(0).subList(1, aggregateProcessExpressionExecutorsListForFind.get(0).size());
        aggregationDefinition.getAttributeList().forEach(this.aggregateMetaSteamEvent::addOutputData);
    }

    private static void initMetaStreamEvent(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition) {
        metaStreamEvent.addInputDefinition(inputDefinition);
        metaStreamEvent.initializeAfterWindowData();
        inputDefinition.getAttributeList().forEach(metaStreamEvent::addData);
    }

    private static void cloneStreamDefinition(StreamDefinition originalStreamDefinition, StreamDefinition newStreamDefinition) {
        for (Attribute attribute : originalStreamDefinition.getAttributeList()) {
            newStreamDefinition.attribute(attribute.getName(), attribute.getType());
        }
    }

    private static MetaStreamEvent createNewMetaStreamEventWithStartEnd(MatchingMetaInfoHolder matchingMetaInfoHolder, List<Attribute> additionalAttributes) {
        MetaStreamEvent metaStreamEventWithStartEnd;
        StreamDefinition streamDefinitionWithStartEnd = new StreamDefinition();
        if (matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents().length == 1) {
            metaStreamEventWithStartEnd = new MetaStreamEvent();
        } else {
            metaStreamEventWithStartEnd = matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvent(matchingMetaInfoHolder.getMatchingStreamEventIndex());
            AggregationRuntime.cloneStreamDefinition((StreamDefinition)metaStreamEventWithStartEnd.getLastInputDefinition(), streamDefinitionWithStartEnd);
        }
        streamDefinitionWithStartEnd.attribute(additionalAttributes.get(0).getName(), additionalAttributes.get(0).getType());
        streamDefinitionWithStartEnd.attribute(additionalAttributes.get(1).getName(), additionalAttributes.get(1).getType());
        AggregationRuntime.initMetaStreamEvent(metaStreamEventWithStartEnd, (AbstractDefinition)streamDefinitionWithStartEnd);
        return metaStreamEventWithStartEnd;
    }

    private static MatchingMetaInfoHolder alterMetaInfoHolderForStoreQuery(MetaStreamEvent newMetaStreamEventWithStartEnd, MatchingMetaInfoHolder matchingMetaInfoHolder) {
        MetaStateEvent metaStateEvent = new MetaStateEvent(2);
        MetaStreamEvent incomingMetaStreamEvent = matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvent(0);
        metaStateEvent.addEvent(newMetaStreamEventWithStartEnd);
        metaStateEvent.addEvent(incomingMetaStreamEvent);
        return new MatchingMetaInfoHolder(metaStateEvent, 0, 1, newMetaStreamEventWithStartEnd.getLastInputDefinition(), incomingMetaStreamEvent.getLastInputDefinition(), -1);
    }

    private static MatchingMetaInfoHolder createNewStreamTableMetaInfoHolder(MetaStreamEvent metaStreamEventWithStartEnd, AbstractDefinition tableDefinition) {
        MetaStateEvent metaStateEvent = new MetaStateEvent(2);
        MetaStreamEvent metaStreamEventForTable = new MetaStreamEvent();
        metaStreamEventForTable.setEventType(MetaStreamEvent.EventType.TABLE);
        AggregationRuntime.initMetaStreamEvent(metaStreamEventForTable, tableDefinition);
        metaStateEvent.addEvent(metaStreamEventWithStartEnd);
        metaStateEvent.addEvent(metaStreamEventForTable);
        return new MatchingMetaInfoHolder(metaStateEvent, 0, 1, metaStreamEventWithStartEnd.getLastInputDefinition(), tableDefinition, -1);
    }

    public AggregationDefinition getAggregationDefinition() {
        return this.aggregationDefinition;
    }

    public SingleStreamRuntime getSingleStreamRuntime() {
        return this.singleStreamRuntime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition, SiddhiQueryContext siddhiQueryContext) {
        try {
            SnapshotService.getSkipStateStorageThreadLocal().set(true);
            if (this.latencyTrackerFind != null && siddhiQueryContext.getSiddhiAppContext().isStatsEnabled()) {
                this.latencyTrackerFind.markIn();
                this.throughputTrackerFind.eventIn();
            }
            if (this.lastExecutorsRefreshedTime == -1L || System.currentTimeMillis() - this.lastExecutorsRefreshedTime > 1000L) {
                if (this.shardId != null) {
                    this.recreateInMemoryData(false, true);
                    this.lastExecutorsRefreshedTime = System.currentTimeMillis();
                } else if (!this.isFirstEventArrived) {
                    this.recreateInMemoryData(false, false);
                    this.lastExecutorsRefreshedTime = System.currentTimeMillis();
                }
            }
            StreamEvent streamEvent = ((IncrementalAggregateCompileCondition)compiledCondition).find(matchingEvent, this.aggregationDefinition, this.incrementalExecutorMap, this.aggregationTables, this.incrementalDurations, this.baseExecutorsForFind, this.outputExpressionExecutors, siddhiQueryContext, this.aggregateProcessExpressionExecutorsListForFind, this.groupByKeyGeneratorList, this.shouldUpdateTimestamp, this.incrementalExecutorMapForPartitions);
            return streamEvent;
        }
        finally {
            SnapshotService.getSkipStateStorageThreadLocal().set(null);
            if (this.latencyTrackerFind != null && siddhiQueryContext.getSiddhiAppContext().isStatsEnabled()) {
                this.latencyTrackerFind.markOut();
            }
        }
    }

    public CompiledCondition compileExpression(Expression expression, Within within, Expression per, MatchingMetaInfoHolder matchingMetaInfoHolder, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
        ExpressionExecutor perExpressionExecutor;
        HashMap<TimePeriod.Duration, CompiledCondition> withinTableCompiledConditions = new HashMap<TimePeriod.Duration, CompiledCondition>();
        ArrayList<Attribute> additionalAttributes = new ArrayList<Attribute>();
        additionalAttributes.add(new Attribute("_START", Attribute.Type.LONG));
        additionalAttributes.add(new Attribute("_END", Attribute.Type.LONG));
        TableDefinition tableDefinition = ((Table)this.aggregationTables.values().toArray()[0]).getTableDefinition();
        MetaStreamEvent newMetaStreamEventWithStartEnd = AggregationRuntime.createNewMetaStreamEventWithStartEnd(matchingMetaInfoHolder, additionalAttributes);
        MatchingMetaInfoHolder alteredMatchingMetaInfoHolder = null;
        if (matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents().length == 1) {
            alteredMatchingMetaInfoHolder = matchingMetaInfoHolder = AggregationRuntime.alterMetaInfoHolderForStoreQuery(newMetaStreamEventWithStartEnd, matchingMetaInfoHolder);
        }
        MatchingMetaInfoHolder streamTableMetaInfoHolderWithStartEnd = AggregationRuntime.createNewStreamTableMetaInfoHolder(newMetaStreamEventWithStartEnd, (AbstractDefinition)tableDefinition);
        if (per != null) {
            perExpressionExecutor = ExpressionParser.parseExpression(per, matchingMetaInfoHolder.getMetaStateEvent(), matchingMetaInfoHolder.getCurrentState(), tableMap, variableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            if (perExpressionExecutor.getReturnType() != Attribute.Type.STRING) {
                throw new SiddhiAppCreationException("Query " + siddhiQueryContext.getName() + "'s per value expected a string but found " + perExpressionExecutor.getReturnType(), per.getQueryContextStartIndex(), per.getQueryContextEndIndex());
            }
            if (perExpressionExecutor instanceof ConstantExpressionExecutor) {
                String perValue = ((ConstantExpressionExecutor)perExpressionExecutor).getValue().toString();
                try {
                    Expression.Time.normalizeDuration((String)perValue);
                }
                catch (SiddhiAppValidationException e) {
                    throw new SiddhiAppValidationException("Aggregation Query's per value is expected to be of a valid time function of the following " + TimePeriod.Duration.SECONDS + ", " + TimePeriod.Duration.MINUTES + ", " + TimePeriod.Duration.HOURS + ", " + TimePeriod.Duration.DAYS + ", " + TimePeriod.Duration.MONTHS + ", " + TimePeriod.Duration.YEARS + ".");
                }
            }
        } else {
            throw new SiddhiAppCreationException("Syntax Error: Aggregation join query must contain a `per` definition for granularity");
        }
        Variable timeFilterExpression = this.processingOnExternalTime ? Expression.variable((String)"AGG_EVENT_TIMESTAMP") : Expression.variable((String)"AGG_TIMESTAMP");
        Variable start = Expression.variable((String)((Attribute)additionalAttributes.get(0)).getName());
        Variable end = Expression.variable((String)((Attribute)additionalAttributes.get(1)).getName());
        Expression compareWithStartTime = Compare.compare((Expression)start, (Compare.Operator)Compare.Operator.LESS_THAN_EQUAL, (Expression)timeFilterExpression);
        Expression compareWithEndTime = Compare.compare((Expression)timeFilterExpression, (Compare.Operator)Compare.Operator.LESS_THAN, (Expression)end);
        Expression withinExpression = Expression.and((Expression)compareWithStartTime, (Expression)compareWithEndTime);
        if (within == null) {
            throw new SiddhiAppCreationException("Syntax Error : Aggregation read query must contain a `within` definition for filtering of aggregation data.");
        }
        AttributeFunction startEndTimeExpression = within.getTimeRange().size() == 1 ? new AttributeFunction("incrementalAggregator", "startTimeEndTime", new Expression[]{(Expression)within.getTimeRange().get(0)}) : new AttributeFunction("incrementalAggregator", "startTimeEndTime", new Expression[]{(Expression)within.getTimeRange().get(0), (Expression)within.getTimeRange().get(1)});
        ExpressionExecutor startTimeEndTimeExpressionExecutor = ExpressionParser.parseExpression((Expression)startEndTimeExpression, matchingMetaInfoHolder.getMetaStateEvent(), matchingMetaInfoHolder.getCurrentState(), tableMap, variableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
        for (Map.Entry<TimePeriod.Duration, Table> entry : this.aggregationTables.entrySet()) {
            CompiledCondition withinTableCompileCondition = entry.getValue().compileCondition(withinExpression, streamTableMetaInfoHolderWithStartEnd, variableExpressionExecutors, tableMap, siddhiQueryContext);
            withinTableCompiledConditions.put(entry.getKey(), withinTableCompileCondition);
        }
        Operator withinInMemoryCompileCondition = OperatorParser.constructOperator(new ComplexEventChunk(true), withinExpression, streamTableMetaInfoHolderWithStartEnd, variableExpressionExecutors, tableMap, siddhiQueryContext);
        Operator onCompiledCondition = OperatorParser.constructOperator(new ComplexEventChunk(true), expression, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext);
        return new IncrementalAggregateCompileCondition(withinTableCompiledConditions, withinInMemoryCompileCondition, onCompiledCondition, this.tableMetaStreamEvent, this.aggregateMetaSteamEvent, additionalAttributes, alteredMatchingMetaInfoHolder, perExpressionExecutor, startTimeEndTimeExpressionExecutor, this.processingOnExternalTime);
    }

    public void startPurging() {
        this.incrementalDataPurging.executeIncrementalDataPurging();
    }

    public void recreateInMemoryData(boolean isEventArrived, boolean refreshReadingExecutors) {
        this.isFirstEventArrived = isEventArrived;
        if (isEventArrived) {
            for (Map.Entry<TimePeriod.Duration, IncrementalExecutor> durationIncrementalExecutorEntry : this.incrementalExecutorMap.entrySet()) {
                durationIncrementalExecutorEntry.getValue().setProcessingExecutor(isEventArrived);
            }
        }
        this.recreateInMemoryData.recreateInMemoryData(refreshReadingExecutors);
    }

    public void processEvents(ComplexEventChunk<StreamEvent> streamEventComplexEventChunk) {
        this.incrementalExecutorMap.get(this.incrementalDurations.get(0)).execute(streamEventComplexEventChunk);
    }
}

