/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.parser;

import io.siddhi.core.aggregation.AggregationRuntime;
import io.siddhi.core.aggregation.IncrementalAggregationProcessor;
import io.siddhi.core.aggregation.IncrementalDataPurger;
import io.siddhi.core.aggregation.IncrementalExecutor;
import io.siddhi.core.aggregation.IncrementalExecutorsInitialiser;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.input.stream.StreamRuntime;
import io.siddhi.core.query.input.stream.single.EntryValveExecutor;
import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.stream.window.QueryableProcessor;
import io.siddhi.core.query.selector.GroupByKeyGenerator;
import io.siddhi.core.query.selector.attribute.aggregator.incremental.IncrementalAttributeAggregator;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.SiddhiClassLoader;
import io.siddhi.core.util.config.ConfigManager;
import io.siddhi.core.util.extension.holder.FunctionExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.IncrementalAttributeAggregatorExtensionHolder;
import io.siddhi.core.util.lock.LockWrapper;
import io.siddhi.core.util.parser.ExpressionParser;
import io.siddhi.core.util.parser.InputStreamParser;
import io.siddhi.core.util.parser.SchedulerParser;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.SiddhiElement;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.input.stream.InputStream;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.expression.AttributeFunction;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.constant.StringConstant;
import io.siddhi.query.api.extension.Extension;
import io.siddhi.query.api.util.AnnotationHelper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class AggregationParser {
    public static AggregationRuntime parse(AggregationDefinition aggregationDefinition, SiddhiAppContext siddhiAppContext, Map<String, AbstractDefinition> streamDefinitionMap, Map<String, AbstractDefinition> tableDefinitionMap, Map<String, AbstractDefinition> windowDefinitionMap, Map<String, AbstractDefinition> aggregationDefinitionMap, Map<String, Table> tableMap, Map<String, Window> windowMap, Map<String, AggregationRuntime> aggregationMap, SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder) {
        if (aggregationDefinition == null) {
            throw new SiddhiAppCreationException("Aggregation Definition instance is null. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'");
        }
        if (aggregationDefinition.getTimePeriod() == null) {
            throw new SiddhiAppCreationException("Aggregation Definition '" + aggregationDefinition.getId() + "'s timePeriod is null. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'", aggregationDefinition.getQueryContextStartIndex(), aggregationDefinition.getQueryContextEndIndex());
        }
        if (aggregationDefinition.getSelector() == null) {
            throw new SiddhiAppCreationException("Aggregation Definition '" + aggregationDefinition.getId() + "'s selection is not defined. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'", aggregationDefinition.getQueryContextStartIndex(), aggregationDefinition.getQueryContextEndIndex());
        }
        if (streamDefinitionMap.get(aggregationDefinition.getBasicSingleInputStream().getStreamId()) == null) {
            throw new SiddhiAppCreationException("Stream " + aggregationDefinition.getBasicSingleInputStream().getStreamId() + " has not been defined");
        }
        Element userDefinedPrimaryKey = AnnotationHelper.getAnnotationElement((String)"PrimaryKey", null, (List)aggregationDefinition.getAnnotations());
        if (userDefinedPrimaryKey != null) {
            throw new SiddhiAppCreationException("Aggregation Tables have predefined primary key, but found '" + userDefinedPrimaryKey.getValue() + "' primary key defined though annotation.");
        }
        try {
            boolean isDistributed;
            ArrayList<VariableExpressionExecutor> incomingVariableExpressionExecutors = new ArrayList<VariableExpressionExecutor>();
            String aggregatorName = aggregationDefinition.getId();
            SiddhiQueryContext siddhiQueryContext = new SiddhiQueryContext(siddhiAppContext, aggregatorName);
            StreamRuntime streamRuntime = InputStreamParser.parse((InputStream)aggregationDefinition.getBasicSingleInputStream(), null, streamDefinitionMap, tableDefinitionMap, windowDefinitionMap, aggregationDefinitionMap, tableMap, windowMap, aggregationMap, incomingVariableExpressionExecutors, false, siddhiQueryContext);
            MetaStreamEvent incomingMetaStreamEvent = (MetaStreamEvent)streamRuntime.getMetaComplexEvent();
            incomingMetaStreamEvent.initializeOnAfterWindowData();
            List<TimePeriod.Duration> incrementalDurations = AggregationParser.getSortedPeriods(aggregationDefinition.getTimePeriod());
            ArrayList<ExpressionExecutor> incomingExpressionExecutors = new ArrayList<ExpressionExecutor>();
            ArrayList<IncrementalAttributeAggregator> incrementalAttributeAggregators = new ArrayList<IncrementalAttributeAggregator>();
            List groupByVariableList = aggregationDefinition.getSelector().getGroupByList();
            ArrayList<Expression> outputExpressions = new ArrayList<Expression>();
            boolean isProcessingOnExternalTime = aggregationDefinition.getAggregateAttribute() != null;
            boolean isGroupBy = aggregationDefinition.getSelector().getGroupByList().size() != 0;
            ConfigManager configManager = siddhiAppContext.getSiddhiContext().getConfigManager();
            String shardId = configManager.extractProperty("shardId");
            boolean enablePartitioning = false;
            Annotation partitionById = AnnotationHelper.getAnnotation((String)"PartitionById", (List)aggregationDefinition.getAnnotations());
            if (partitionById != null) {
                String enableElement = partitionById.getElement("enable");
                enablePartitioning = enableElement == null || Boolean.parseBoolean(enableElement);
            }
            boolean shouldPartitionById = Boolean.parseBoolean(configManager.extractProperty("partitionById"));
            if (enablePartitioning || shouldPartitionById) {
                if (shardId == null) {
                    throw new SiddhiAppCreationException("Configuration 'shardId' not provided for @partitionById annotation");
                }
                isDistributed = true;
            } else {
                isDistributed = false;
            }
            AggregationParser.populateIncomingAggregatorsAndExecutors(aggregationDefinition, siddhiQueryContext, tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, incrementalAttributeAggregators, groupByVariableList, outputExpressions, isProcessingOnExternalTime, isDistributed, shardId);
            boolean isLatestEventColAdded = incomingMetaStreamEvent.getOutputData().get(incomingMetaStreamEvent.getOutputData().size() - 1).getName().equals("AGG_LAST_EVENT_TIMESTAMP");
            int baseAggregatorBeginIndex = incomingMetaStreamEvent.getOutputData().size();
            ArrayList<Expression> finalBaseExpressions = new ArrayList<Expression>();
            boolean isOptimisedLookup = AggregationParser.populateFinalBaseAggregators(tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, incrementalAttributeAggregators, siddhiQueryContext, finalBaseExpressions);
            StreamDefinition incomingOutputStreamDefinition = StreamDefinition.id((String)(aggregatorName + "_intermediate"));
            incomingOutputStreamDefinition.setQueryContextStartIndex(aggregationDefinition.getQueryContextStartIndex());
            incomingOutputStreamDefinition.setQueryContextEndIndex(aggregationDefinition.getQueryContextEndIndex());
            MetaStreamEvent processedMetaStreamEvent = new MetaStreamEvent();
            for (Attribute attribute2 : incomingMetaStreamEvent.getOutputData()) {
                incomingOutputStreamDefinition.attribute(attribute2.getName(), attribute2.getType());
                processedMetaStreamEvent.addOutputData(attribute2);
            }
            incomingMetaStreamEvent.setOutputDefinition(incomingOutputStreamDefinition);
            processedMetaStreamEvent.addInputDefinition((AbstractDefinition)incomingOutputStreamDefinition);
            processedMetaStreamEvent.setOutputDefinition(incomingOutputStreamDefinition);
            ArrayList<VariableExpressionExecutor> processVariableExpressionExecutors = new ArrayList<VariableExpressionExecutor>();
            HashMap<TimePeriod.Duration, List<ExpressionExecutor>> processExpressionExecutorsMap = new HashMap<TimePeriod.Duration, List<ExpressionExecutor>>();
            HashMap<TimePeriod.Duration, List<ExpressionExecutor>> processExpressionExecutorsMapForFind = new HashMap<TimePeriod.Duration, List<ExpressionExecutor>>();
            incrementalDurations.forEach(incrementalDuration -> {
                processExpressionExecutorsMap.put((TimePeriod.Duration)incrementalDuration, AggregationParser.constructProcessExpressionExecutors(siddhiQueryContext, tableMap, baseAggregatorBeginIndex, finalBaseExpressions, incomingOutputStreamDefinition, processedMetaStreamEvent, processVariableExpressionExecutors, isProcessingOnExternalTime, incrementalDuration, isDistributed, shardId, isLatestEventColAdded));
                processExpressionExecutorsMapForFind.put((TimePeriod.Duration)incrementalDuration, AggregationParser.constructProcessExpressionExecutors(siddhiQueryContext, tableMap, baseAggregatorBeginIndex, finalBaseExpressions, incomingOutputStreamDefinition, processedMetaStreamEvent, processVariableExpressionExecutors, isProcessingOnExternalTime, incrementalDuration, isDistributed, shardId, isLatestEventColAdded));
            });
            ExpressionExecutor shouldUpdateTimestamp = null;
            if (isLatestEventColAdded) {
                Variable shouldUpdateTimestampExp = new Variable("AGG_LAST_EVENT_TIMESTAMP");
                shouldUpdateTimestamp = ExpressionParser.parseExpression((Expression)shouldUpdateTimestampExp, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            }
            List<ExpressionExecutor> outputExpressionExecutors = outputExpressions.stream().map(expression -> ExpressionParser.parseExpression(expression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, isGroupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext)).collect(Collectors.toList());
            HashMap<TimePeriod.Duration, GroupByKeyGenerator> groupByKeyGeneratorMap = new HashMap<TimePeriod.Duration, GroupByKeyGenerator>();
            incrementalDurations.forEach(incrementalDuration -> {
                GroupByKeyGenerator groupByKeyGenerator = null;
                if (isProcessingOnExternalTime || isGroupBy) {
                    ArrayList<Expression> groupByExpressionList = new ArrayList<Expression>();
                    if (isProcessingOnExternalTime) {
                        Expression externalTimestampExpression = AttributeFunction.function((String)"incrementalAggregator", (String)"getAggregationStartTime", (Expression[])new Expression[]{new Variable("AGG_EVENT_TIMESTAMP"), new StringConstant(incrementalDuration.name())});
                        groupByExpressionList.add(externalTimestampExpression);
                    }
                    groupByExpressionList.addAll(groupByVariableList.stream().map(groupByVariable -> groupByVariable).collect(Collectors.toList()));
                    groupByKeyGenerator = new GroupByKeyGenerator(groupByExpressionList, processedMetaStreamEvent, -1, tableMap, processVariableExpressionExecutors, siddhiQueryContext);
                }
                groupByKeyGeneratorMap.put((TimePeriod.Duration)incrementalDuration, groupByKeyGenerator);
            });
            HashMap<TimePeriod.Duration, GroupByKeyGenerator> groupByKeyGeneratorMapForReading = new HashMap<TimePeriod.Duration, GroupByKeyGenerator>();
            if (isDistributed && !isProcessingOnExternalTime) {
                incrementalDurations.forEach(incrementalDuration -> {
                    ArrayList<Expression> groupByExpressionList = new ArrayList<Expression>();
                    Expression timestampExpression = AttributeFunction.function((String)"incrementalAggregator", (String)"getAggregationStartTime", (Expression[])new Expression[]{new Variable("AGG_TIMESTAMP"), new StringConstant(incrementalDuration.name())});
                    groupByExpressionList.add(timestampExpression);
                    if (isGroupBy) {
                        groupByExpressionList.addAll(groupByVariableList.stream().map(groupByVariable -> groupByVariable).collect(Collectors.toList()));
                    }
                    GroupByKeyGenerator groupByKeyGenerator = new GroupByKeyGenerator(groupByExpressionList, processedMetaStreamEvent, -1, tableMap, processVariableExpressionExecutors, siddhiQueryContext);
                    groupByKeyGeneratorMapForReading.put((TimePeriod.Duration)incrementalDuration, groupByKeyGenerator);
                });
            } else {
                groupByKeyGeneratorMapForReading.putAll(groupByKeyGeneratorMap);
            }
            EntryValveExecutor entryValveExecutor = new EntryValveExecutor(siddhiAppContext);
            LockWrapper lockWrapper = new LockWrapper(aggregatorName);
            lockWrapper.setLock(new ReentrantLock());
            Scheduler scheduler = SchedulerParser.parse(entryValveExecutor, siddhiQueryContext);
            scheduler.init(lockWrapper, aggregatorName);
            scheduler.setStreamEventFactory(new StreamEventFactory(processedMetaStreamEvent));
            QueryParserHelper.reduceMetaComplexEvent(incomingMetaStreamEvent);
            QueryParserHelper.reduceMetaComplexEvent(processedMetaStreamEvent);
            QueryParserHelper.updateVariablePosition(incomingMetaStreamEvent, incomingVariableExpressionExecutors);
            QueryParserHelper.updateVariablePosition(processedMetaStreamEvent, processVariableExpressionExecutors);
            HashMap<TimePeriod.Duration, Table> aggregationTables = AggregationParser.initDefaultTables(aggregatorName, incrementalDurations, processedMetaStreamEvent.getOutputStreamDefinition(), siddhiAppRuntimeBuilder, aggregationDefinition.getAnnotations(), groupByVariableList, isProcessingOnExternalTime, isDistributed);
            Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap = AggregationParser.buildIncrementalExecutors(processedMetaStreamEvent, processExpressionExecutorsMap, groupByKeyGeneratorMap, incrementalDurations, aggregationTables, siddhiQueryContext, aggregatorName, shouldUpdateTimestamp);
            isOptimisedLookup = isOptimisedLookup && aggregationTables.get(incrementalDurations.get(0)) instanceof QueryableProcessor;
            List<String> groupByVariablesList = groupByVariableList.stream().map(Variable::getAttributeName).collect(Collectors.toList());
            ArrayList<OutputAttribute> defaultSelectorList = new ArrayList();
            if (isOptimisedLookup) {
                defaultSelectorList = incomingOutputStreamDefinition.getAttributeList().stream().map(attribute -> new OutputAttribute(new Variable(attribute.getName()))).collect(Collectors.toList());
            }
            IncrementalDataPurger incrementalDataPurger = new IncrementalDataPurger();
            incrementalDataPurger.init(aggregationDefinition, new StreamEventFactory(processedMetaStreamEvent), aggregationTables, isProcessingOnExternalTime, siddhiQueryContext);
            IncrementalExecutorsInitialiser incrementalExecutorsInitialiser = new IncrementalExecutorsInitialiser(incrementalDurations, aggregationTables, incrementalExecutorMap, isDistributed, shardId, siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap);
            IncrementalExecutor rootIncrementalExecutor = incrementalExecutorMap.get(incrementalDurations.get(0));
            rootIncrementalExecutor.setScheduler(scheduler);
            entryValveExecutor.setNextExecutor(rootIncrementalExecutor);
            QueryParserHelper.initStreamRuntime(streamRuntime, incomingMetaStreamEvent, lockWrapper, aggregatorName);
            LatencyTracker latencyTrackerFind = null;
            LatencyTracker latencyTrackerInsert = null;
            ThroughputTracker throughputTrackerFind = null;
            ThroughputTracker throughputTrackerInsert = null;
            if (siddhiAppContext.getStatisticsManager() != null) {
                latencyTrackerFind = QueryParserHelper.createLatencyTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "find");
                latencyTrackerInsert = QueryParserHelper.createLatencyTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "insert");
                throughputTrackerFind = QueryParserHelper.createThroughputTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "find");
                throughputTrackerInsert = QueryParserHelper.createThroughputTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "insert");
            }
            AggregationRuntime aggregationRuntime = new AggregationRuntime(aggregationDefinition, isProcessingOnExternalTime, isDistributed, incrementalDurations, incrementalExecutorMap, aggregationTables, outputExpressionExecutors, processExpressionExecutorsMapForFind, shouldUpdateTimestamp, groupByKeyGeneratorMapForReading, isOptimisedLookup, defaultSelectorList, groupByVariablesList, isLatestEventColAdded, baseAggregatorBeginIndex, finalBaseExpressions, incrementalDataPurger, incrementalExecutorsInitialiser, (SingleStreamRuntime)streamRuntime, processedMetaStreamEvent, latencyTrackerFind, throughputTrackerFind);
            streamRuntime.setCommonProcessor(new IncrementalAggregationProcessor(aggregationRuntime, incomingExpressionExecutors, processedMetaStreamEvent, latencyTrackerInsert, throughputTrackerInsert, siddhiAppContext));
            return aggregationRuntime;
        }
        catch (Throwable t) {
            ExceptionUtil.populateQueryContext(t, (SiddhiElement)aggregationDefinition, siddhiAppContext);
            throw t;
        }
    }

    private static Map<TimePeriod.Duration, IncrementalExecutor> buildIncrementalExecutors(MetaStreamEvent processedMetaStreamEvent, Map<TimePeriod.Duration, List<ExpressionExecutor>> processExpressionExecutorsMap, Map<TimePeriod.Duration, GroupByKeyGenerator> groupByKeyGeneratorList, List<TimePeriod.Duration> incrementalDurations, Map<TimePeriod.Duration, Table> aggregationTables, SiddhiQueryContext siddhiQueryContext, String aggregatorName, ExpressionExecutor shouldUpdateTimestamp) {
        HashMap<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap = new HashMap<TimePeriod.Duration, IncrementalExecutor>();
        IncrementalExecutor root = null;
        for (int i = incrementalDurations.size() - 1; i >= 0; --i) {
            boolean isRoot = false;
            if (i == 0) {
                isRoot = true;
            }
            IncrementalExecutor child = root;
            TimePeriod.Duration duration = incrementalDurations.get(i);
            IncrementalExecutor incrementalExecutor = new IncrementalExecutor(aggregatorName, duration, processExpressionExecutorsMap.get(duration), shouldUpdateTimestamp, groupByKeyGeneratorList.get(duration), isRoot, aggregationTables.get(duration), child, siddhiQueryContext, processedMetaStreamEvent);
            incrementalExecutorMap.put(duration, incrementalExecutor);
            root = incrementalExecutor;
        }
        return incrementalExecutorMap;
    }

    private static List<ExpressionExecutor> constructProcessExpressionExecutors(SiddhiQueryContext siddhiQueryContext, Map<String, Table> tableMap, int baseAggregatorBeginIndex, List<Expression> finalBaseExpressions, StreamDefinition incomingOutputStreamDefinition, MetaStreamEvent processedMetaStreamEvent, List<VariableExpressionExecutor> processVariableExpressionExecutors, boolean isProcessingOnExternalTime, TimePeriod.Duration duration, boolean isDistributed, String shardId, boolean isLatestEventColAdded) {
        ArrayList<ExpressionExecutor> processExpressionExecutors = new ArrayList<ExpressionExecutor>();
        List attributeList = incomingOutputStreamDefinition.getAttributeList();
        int i = 1;
        Attribute attribute = (Attribute)attributeList.get(0);
        VariableExpressionExecutor variableExpressionExecutor = (VariableExpressionExecutor)ExpressionParser.parseExpression((Expression)new Variable(attribute.getName()), processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
        processExpressionExecutors.add(variableExpressionExecutor);
        if (isDistributed) {
            StringConstant shardIdExpression = Expression.value((String)shardId);
            ExpressionExecutor shardIdExpressionExecutor = ExpressionParser.parseExpression((Expression)shardIdExpression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(shardIdExpressionExecutor);
            ++i;
        }
        if (isProcessingOnExternalTime) {
            Expression externalTimestampExpression = AttributeFunction.function((String)"incrementalAggregator", (String)"getAggregationStartTime", (Expression[])new Expression[]{new Variable("AGG_EVENT_TIMESTAMP"), new StringConstant(duration.name())});
            ExpressionExecutor externalTimestampExecutor = ExpressionParser.parseExpression(externalTimestampExpression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(externalTimestampExecutor);
            ++i;
        }
        if (isLatestEventColAdded) {
            --baseAggregatorBeginIndex;
        }
        while (i < baseAggregatorBeginIndex) {
            attribute = (Attribute)attributeList.get(i);
            variableExpressionExecutor = (VariableExpressionExecutor)ExpressionParser.parseExpression((Expression)new Variable(attribute.getName()), processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(variableExpressionExecutor);
            ++i;
        }
        if (isLatestEventColAdded) {
            Expression lastTimestampExpression = AttributeFunction.function((String)"max", (Expression[])new Expression[]{new Variable("AGG_LAST_EVENT_TIMESTAMP")});
            ExpressionExecutor latestTimestampExecutor = ExpressionParser.parseExpression(lastTimestampExpression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(latestTimestampExecutor);
        }
        for (Expression expression : finalBaseExpressions) {
            ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, true, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(expressionExecutor);
        }
        return processExpressionExecutors;
    }

    private static boolean populateFinalBaseAggregators(Map<String, Table> tableMap, List<VariableExpressionExecutor> incomingVariableExpressionExecutors, MetaStreamEvent incomingMetaStreamEvent, List<ExpressionExecutor> incomingExpressionExecutors, List<IncrementalAttributeAggregator> incrementalAttributeAggregators, SiddhiQueryContext siddhiQueryContext, List<Expression> finalBaseAggregators) {
        boolean isOptimisedLookup = true;
        ArrayList<Attribute> finalBaseAttributes = new ArrayList<Attribute>();
        for (IncrementalAttributeAggregator incrementalAttributeAggregator : incrementalAttributeAggregators) {
            Attribute[] baseAttributes = incrementalAttributeAggregator.getBaseAttributes();
            Expression[] baseAttributeInitialValues = incrementalAttributeAggregator.getBaseAttributeInitialValues();
            Expression[] baseAggregators = incrementalAttributeAggregator.getBaseAggregators();
            if (baseAggregators.length > 1) {
                isOptimisedLookup = false;
            }
            for (int i = 0; i < baseAttributes.length; ++i) {
                AggregationParser.validateBaseAggregators(incrementalAttributeAggregators, incrementalAttributeAggregator, baseAttributes, baseAttributeInitialValues, baseAggregators, i);
                if (finalBaseAttributes.contains(baseAttributes[i])) continue;
                finalBaseAttributes.add(baseAttributes[i]);
                finalBaseAggregators.add(baseAggregators[i]);
                incomingMetaStreamEvent.addOutputData(baseAttributes[i]);
                incomingExpressionExecutors.add(ExpressionParser.parseExpression(baseAttributeInitialValues[i], incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext));
            }
        }
        return isOptimisedLookup;
    }

    private static void populateIncomingAggregatorsAndExecutors(AggregationDefinition aggregationDefinition, SiddhiQueryContext siddhiQueryContext, Map<String, Table> tableMap, List<VariableExpressionExecutor> incomingVariableExpressionExecutors, MetaStreamEvent incomingMetaStreamEvent, List<ExpressionExecutor> incomingExpressionExecutors, List<IncrementalAttributeAggregator> incrementalAttributeAggregators, List<Variable> groupByVariableList, List<Expression> outputExpressions, boolean isProcessingOnExternalTime, boolean isDistributed, String shardId) {
        boolean isLatestEventAdded = false;
        ExpressionExecutor timestampExecutor = AggregationParser.getTimeStampExecutor(siddhiQueryContext, tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent);
        Attribute timestampAttribute = new Attribute("AGG_TIMESTAMP", Attribute.Type.LONG);
        incomingMetaStreamEvent.addOutputData(timestampAttribute);
        incomingExpressionExecutors.add(timestampExecutor);
        if (isDistributed) {
            ConstantExpressionExecutor nodeIdExpExecutor = new ConstantExpressionExecutor(shardId, Attribute.Type.STRING);
            incomingExpressionExecutors.add(nodeIdExpExecutor);
            incomingMetaStreamEvent.addOutputData(new Attribute("SHARD_ID", Attribute.Type.STRING));
        }
        ExpressionExecutor externalTimestampExecutor = null;
        if (isProcessingOnExternalTime) {
            Variable externalTimestampExpression = aggregationDefinition.getAggregateAttribute();
            externalTimestampExecutor = ExpressionParser.parseExpression((Expression)externalTimestampExpression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            if (externalTimestampExecutor.getReturnType() == Attribute.Type.STRING) {
                Expression expression = AttributeFunction.function((String)"incrementalAggregator", (String)"timestampInMilliseconds", (Expression[])new Expression[]{externalTimestampExpression});
                externalTimestampExecutor = ExpressionParser.parseExpression(expression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            } else if (externalTimestampExecutor.getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("Aggregation Definition '" + aggregationDefinition.getId() + "'s timestamp attribute expects long or string, but found " + externalTimestampExecutor.getReturnType() + ". Hence, can't create the siddhi app '" + siddhiQueryContext.getSiddhiAppContext().getName() + "'", externalTimestampExpression.getQueryContextStartIndex(), externalTimestampExpression.getQueryContextEndIndex());
            }
            Iterator<Object> externalTimestampAttribute = new Attribute("AGG_EVENT_TIMESTAMP", Attribute.Type.LONG);
            incomingMetaStreamEvent.addOutputData((Attribute)externalTimestampAttribute);
            incomingExpressionExecutors.add(externalTimestampExecutor);
        }
        AbstractDefinition incomingLastInputStreamDefinition = incomingMetaStreamEvent.getLastInputDefinition();
        for (Variable groupByVariable : groupByVariableList) {
            incomingMetaStreamEvent.addOutputData((Attribute)incomingLastInputStreamDefinition.getAttributeList().get(incomingLastInputStreamDefinition.getAttributePosition(groupByVariable.getAttributeName())));
            incomingExpressionExecutors.add(ExpressionParser.parseExpression((Expression)groupByVariable, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext));
        }
        aggregationDefinition.getAttributeList().add(timestampAttribute);
        if (isProcessingOnExternalTime) {
            outputExpressions.add((Expression)Expression.variable((String)"AGG_EVENT_TIMESTAMP"));
        } else {
            outputExpressions.add((Expression)Expression.variable((String)"AGG_TIMESTAMP"));
        }
        for (OutputAttribute outputAttribute : aggregationDefinition.getSelector().getSelectionList()) {
            Expression expression = outputAttribute.getExpression();
            if (expression instanceof AttributeFunction) {
                IncrementalAttributeAggregator incrementalAggregator = null;
                try {
                    incrementalAggregator = (IncrementalAttributeAggregator)SiddhiClassLoader.loadExtensionImplementation((Extension)new AttributeFunction("incrementalAggregator", ((AttributeFunction)expression).getName(), ((AttributeFunction)expression).getParameters()), IncrementalAttributeAggregatorExtensionHolder.getInstance(siddhiQueryContext.getSiddhiAppContext()));
                }
                catch (SiddhiAppCreationException ex) {
                    try {
                        SiddhiClassLoader.loadExtensionImplementation((Extension)((AttributeFunction)expression), FunctionExecutorExtensionHolder.getInstance(siddhiQueryContext.getSiddhiAppContext()));
                        AggregationParser.processAggregationSelectors(aggregationDefinition, siddhiQueryContext, tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, outputExpressions, outputAttribute, expression);
                    }
                    catch (SiddhiAppCreationException e) {
                        throw new SiddhiAppCreationException("'" + ((AttributeFunction)expression).getName() + "' is neither a incremental attribute aggregator extension or a function extension", expression.getQueryContextStartIndex(), expression.getQueryContextEndIndex());
                    }
                }
                if (incrementalAggregator == null) continue;
                AggregationParser.initIncrementalAttributeAggregator(incomingLastInputStreamDefinition, (AttributeFunction)expression, incrementalAggregator);
                incrementalAttributeAggregators.add(incrementalAggregator);
                aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), incrementalAggregator.getReturnType()));
                outputExpressions.add(incrementalAggregator.aggregate());
                continue;
            }
            if (expression instanceof Variable && groupByVariableList.contains(expression)) {
                Attribute groupByAttribute = null;
                for (Attribute attribute : incomingMetaStreamEvent.getOutputData()) {
                    if (!attribute.getName().equals(((Variable)expression).getAttributeName())) continue;
                    groupByAttribute = attribute;
                    break;
                }
                if (groupByAttribute == null) {
                    throw new SiddhiAppCreationException("Expected GroupBy attribute '" + ((Variable)expression).getAttributeName() + "' not used in aggregation '" + siddhiQueryContext.getName() + "' processing.", expression.getQueryContextStartIndex(), expression.getQueryContextEndIndex());
                }
                aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), groupByAttribute.getType()));
                outputExpressions.add((Expression)Expression.variable((String)groupByAttribute.getName()));
                continue;
            }
            isLatestEventAdded = true;
            AggregationParser.processAggregationSelectors(aggregationDefinition, siddhiQueryContext, tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, outputExpressions, outputAttribute, expression);
        }
        if (isProcessingOnExternalTime && isLatestEventAdded) {
            Attribute lastEventTimeStamp = new Attribute("AGG_LAST_EVENT_TIMESTAMP", Attribute.Type.LONG);
            incomingMetaStreamEvent.addOutputData(lastEventTimeStamp);
            incomingExpressionExecutors.add(externalTimestampExecutor);
        }
    }

    private static void processAggregationSelectors(AggregationDefinition aggregationDefinition, SiddhiQueryContext siddhiQueryContext, Map<String, Table> tableMap, List<VariableExpressionExecutor> incomingVariableExpressionExecutors, MetaStreamEvent incomingMetaStreamEvent, List<ExpressionExecutor> incomingExpressionExecutors, List<Expression> outputExpressions, OutputAttribute outputAttribute, Expression expression) {
        ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
        incomingExpressionExecutors.add(expressionExecutor);
        incomingMetaStreamEvent.addOutputData(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
        aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
        outputExpressions.add((Expression)Expression.variable((String)outputAttribute.getRename()));
    }

    private static void validateBaseAggregators(List<IncrementalAttributeAggregator> incrementalAttributeAggregators, IncrementalAttributeAggregator incrementalAttributeAggregator, Attribute[] baseAttributes, Expression[] baseAttributeInitialValues, Expression[] baseAggregators, int i) {
        for (int i1 = i; i1 < incrementalAttributeAggregators.size(); ++i1) {
            IncrementalAttributeAggregator otherAttributeAggregator = incrementalAttributeAggregators.get(i1);
            if (otherAttributeAggregator == incrementalAttributeAggregator) continue;
            Attribute[] otherBaseAttributes = otherAttributeAggregator.getBaseAttributes();
            Expression[] otherBaseAttributeInitialValues = otherAttributeAggregator.getBaseAttributeInitialValues();
            Expression[] otherBaseAggregators = otherAttributeAggregator.getBaseAggregators();
            for (int j = 0; j < otherBaseAttributes.length; ++j) {
                if (!baseAttributes[i].equals((Object)otherBaseAttributes[j])) continue;
                if (!baseAttributeInitialValues[i].equals(otherBaseAttributeInitialValues[j])) {
                    throw new SiddhiAppCreationException("BaseAttributes having same name should be defined with same initial values, but baseAttribute '" + baseAttributes[i] + "' is defined in '" + incrementalAttributeAggregator.getClass().getName() + "' and '" + otherAttributeAggregator.getClass().getName() + "' with different initial values.");
                }
                if (baseAggregators[i].equals(otherBaseAggregators[j])) continue;
                throw new SiddhiAppCreationException("BaseAttributes having same name should be defined with same baseAggregators, but baseAttribute '" + baseAttributes[i] + "' is defined in '" + incrementalAttributeAggregator.getClass().getName() + "' and '" + otherAttributeAggregator.getClass().getName() + "' with different baseAggregators.");
            }
        }
    }

    private static void initIncrementalAttributeAggregator(AbstractDefinition lastInputStreamDefinition, AttributeFunction attributeFunction, IncrementalAttributeAggregator incrementalAttributeAggregator) {
        String attributeName = null;
        Attribute.Type attributeType = null;
        if (attributeFunction.getParameters() != null && attributeFunction.getParameters()[0] != null) {
            if (attributeFunction.getParameters().length != 1) {
                throw new SiddhiAppCreationException("Incremental aggregator requires only one parameter. Found " + attributeFunction.getParameters().length, attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
            }
            if (!(attributeFunction.getParameters()[0] instanceof Variable)) {
                throw new SiddhiAppCreationException("Incremental aggregator expected a variable. However a parameter of type " + attributeFunction.getParameters()[0].getClass().getTypeName() + " was found", attributeFunction.getParameters()[0].getQueryContextStartIndex(), attributeFunction.getParameters()[0].getQueryContextEndIndex());
            }
            attributeName = ((Variable)attributeFunction.getParameters()[0]).getAttributeName();
            attributeType = lastInputStreamDefinition.getAttributeType(attributeName);
        }
        incrementalAttributeAggregator.init(attributeName, attributeType);
        Attribute[] baseAttributes = incrementalAttributeAggregator.getBaseAttributes();
        Expression[] baseAttributeInitialValues = incrementalAttributeAggregator.getBaseAttributeInitialValues();
        Expression[] baseAggregators = incrementalAttributeAggregator.getBaseAggregators();
        if (baseAttributes.length != baseAggregators.length) {
            throw new SiddhiAppCreationException("Number of baseAggregators '" + baseAggregators.length + "' and baseAttributes '" + baseAttributes.length + "' is not equal for '" + attributeFunction + "'", attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
        }
        if (baseAttributeInitialValues.length != baseAggregators.length) {
            throw new SiddhiAppCreationException("Number of baseAggregators '" + baseAggregators.length + "' and baseAttributeInitialValues '" + baseAttributeInitialValues.length + "' is not equal for '" + attributeFunction + "'", attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
        }
    }

    private static ExpressionExecutor getTimeStampExecutor(SiddhiQueryContext siddhiQueryContext, Map<String, Table> tableMap, List<VariableExpressionExecutor> variableExpressionExecutors, MetaStreamEvent metaStreamEvent) {
        Expression timestampExpression = AttributeFunction.function((String)"currentTimeMillis", null);
        ExpressionExecutor timestampExecutor = ExpressionParser.parseExpression(timestampExpression, metaStreamEvent, 0, tableMap, variableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
        return timestampExecutor;
    }

    private static boolean isRange(TimePeriod timePeriod) {
        return timePeriod.getOperator() == TimePeriod.Operator.RANGE;
    }

    private static List<TimePeriod.Duration> getSortedPeriods(TimePeriod timePeriod) {
        try {
            List<TimePeriod.Duration> durations = timePeriod.getDurations();
            if (AggregationParser.isRange(timePeriod)) {
                durations = AggregationParser.fillGap((TimePeriod.Duration)durations.get(0), (TimePeriod.Duration)durations.get(1));
            }
            return AggregationParser.sortedDurations(durations);
        }
        catch (Throwable t) {
            ExceptionUtil.populateQueryContext(t, (SiddhiElement)timePeriod, null);
            throw t;
        }
    }

    private static List<TimePeriod.Duration> sortedDurations(List<TimePeriod.Duration> durations) {
        ArrayList<TimePeriod.Duration> copyDurations = new ArrayList<TimePeriod.Duration>(durations);
        Comparator periodComparator = (firstDuration, secondDuration) -> {
            int secondOrdinal;
            int firstOrdinal = firstDuration.ordinal();
            if (firstOrdinal > (secondOrdinal = secondDuration.ordinal())) {
                return 1;
            }
            if (firstOrdinal < secondOrdinal) {
                return -1;
            }
            return 0;
        };
        copyDurations.sort(periodComparator);
        return copyDurations;
    }

    private static List<TimePeriod.Duration> fillGap(TimePeriod.Duration start, TimePeriod.Duration end) {
        int endIndex;
        TimePeriod.Duration[] durations = TimePeriod.Duration.values();
        List<TimePeriod.Duration> filledDurations = new ArrayList<TimePeriod.Duration>();
        int startIndex = start.ordinal();
        if (startIndex > (endIndex = end.ordinal())) {
            throw new SiddhiAppCreationException("Start time period must be less than end time period for range aggregation calculation");
        }
        if (startIndex == endIndex) {
            filledDurations.add(start);
        } else {
            TimePeriod.Duration[] temp = new TimePeriod.Duration[endIndex - startIndex + 1];
            System.arraycopy(durations, startIndex, temp, 0, endIndex - startIndex + 1);
            filledDurations = Arrays.asList(temp);
        }
        return filledDurations;
    }

    private static HashMap<TimePeriod.Duration, Table> initDefaultTables(String aggregatorName, List<TimePeriod.Duration> durations, StreamDefinition streamDefinition, SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder, List<Annotation> annotations, List<Variable> groupByVariableList, boolean isProcessingOnExternalTime, boolean enablePartioning) {
        HashMap<TimePeriod.Duration, Table> aggregationTableMap = new HashMap<TimePeriod.Duration, Table>();
        Annotation primaryKeyAnnotation = new Annotation("PrimaryKey");
        primaryKeyAnnotation.element(null, "AGG_TIMESTAMP");
        if (enablePartioning) {
            primaryKeyAnnotation.element(null, "SHARD_ID");
        }
        if (isProcessingOnExternalTime) {
            primaryKeyAnnotation.element(null, "AGG_EVENT_TIMESTAMP");
        }
        for (Variable groupByVariable : groupByVariableList) {
            primaryKeyAnnotation.element(null, groupByVariable.getAttributeName());
        }
        annotations.add(primaryKeyAnnotation);
        for (TimePeriod.Duration duration : durations) {
            String tableId = aggregatorName + "_" + duration.toString();
            TableDefinition tableDefinition = TableDefinition.id((String)tableId);
            for (Attribute attribute : streamDefinition.getAttributeList()) {
                tableDefinition.attribute(attribute.getName(), attribute.getType());
            }
            annotations.forEach(arg_0 -> ((TableDefinition)tableDefinition).annotation(arg_0));
            siddhiAppRuntimeBuilder.defineTable(tableDefinition);
            aggregationTableMap.put(duration, (Table)siddhiAppRuntimeBuilder.getTableMap().get(tableId));
        }
        return aggregationTableMap;
    }

    public static StreamEvent createRestEvent(MetaStreamEvent metaStreamEvent, StreamEvent streamEvent) {
        streamEvent.setTimestamp(0L);
        streamEvent.setType(ComplexEvent.Type.RESET);
        List<Attribute> outputData = metaStreamEvent.getOutputData();
        int outputDataSize = outputData.size();
        block9: for (int i = 0; i < outputDataSize; ++i) {
            Attribute attribute = outputData.get(i);
            switch (attribute.getType()) {
                case STRING: {
                    streamEvent.setOutputData("", i);
                    continue block9;
                }
                case INT: {
                    streamEvent.setOutputData(0, i);
                    continue block9;
                }
                case LONG: {
                    streamEvent.setOutputData(0L, i);
                    continue block9;
                }
                case FLOAT: {
                    streamEvent.setOutputData(Float.valueOf(0.0f), i);
                    continue block9;
                }
                case DOUBLE: {
                    streamEvent.setOutputData(0.0, i);
                    continue block9;
                }
                case BOOL: {
                    streamEvent.setOutputData(false, i);
                    continue block9;
                }
                case OBJECT: {
                    streamEvent.setOutputData(null, i);
                }
            }
        }
        return streamEvent;
    }
}

