/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.parser;

import io.siddhi.core.aggregation.AggregationRuntime;
import io.siddhi.core.aggregation.IncrementalAggregationProcessor;
import io.siddhi.core.aggregation.IncrementalDataPurging;
import io.siddhi.core.aggregation.IncrementalExecutor;
import io.siddhi.core.aggregation.RecreateInMemoryData;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEventPool;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.input.stream.StreamRuntime;
import io.siddhi.core.query.input.stream.single.EntryValveExecutor;
import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.selector.GroupByKeyGenerator;
import io.siddhi.core.query.selector.attribute.aggregator.incremental.IncrementalAttributeAggregator;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.SiddhiAppRuntimeBuilder;
import io.siddhi.core.util.SiddhiClassLoader;
import io.siddhi.core.util.config.ConfigManager;
import io.siddhi.core.util.extension.holder.FunctionExecutorExtensionHolder;
import io.siddhi.core.util.extension.holder.IncrementalAttributeAggregatorExtensionHolder;
import io.siddhi.core.util.lock.LockWrapper;
import io.siddhi.core.util.parser.ExpressionParser;
import io.siddhi.core.util.parser.InputStreamParser;
import io.siddhi.core.util.parser.SchedulerParser;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.SiddhiElement;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.input.stream.InputStream;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.expression.AttributeFunction;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.constant.StringConstant;
import io.siddhi.query.api.extension.Extension;
import io.siddhi.query.api.util.AnnotationHelper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;

public class AggregationParser {
    private static final Logger LOG = Logger.getLogger(AggregationParser.class);
    private static final String AGG_START_TIMESTAMP_COL = "AGG_TIMESTAMP";
    private static final String AGG_EXTERNAL_TIMESTAMP_COL = "AGG_EVENT_TIMESTAMP";
    private static final String AGG_LAST_TIMESTAMP_COL = "AGG_LAST_EVENT_TIMESTAMP";
    private static final String SHARD_ID_COL = "SHARD_ID";

    public static AggregationRuntime parse(AggregationDefinition aggregationDefinition, SiddhiAppContext siddhiAppContext, Map<String, AbstractDefinition> streamDefinitionMap, Map<String, AbstractDefinition> tableDefinitionMap, Map<String, AbstractDefinition> windowDefinitionMap, Map<String, AbstractDefinition> aggregationDefinitionMap, Map<String, Table> tableMap, Map<String, Window> windowMap, Map<String, AggregationRuntime> aggregationMap, SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder) {
        if (aggregationDefinition == null) {
            throw new SiddhiAppCreationException("AggregationDefinition instance is null. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'");
        }
        if (aggregationDefinition.getTimePeriod() == null) {
            throw new SiddhiAppCreationException("AggregationDefinition '" + aggregationDefinition.getId() + "'s timePeriod is null. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'", aggregationDefinition.getQueryContextStartIndex(), aggregationDefinition.getQueryContextEndIndex());
        }
        if (aggregationDefinition.getSelector() == null) {
            throw new SiddhiAppCreationException("AggregationDefinition '" + aggregationDefinition.getId() + "'s selection is not defined. Hence, can't create the siddhi app '" + siddhiAppContext.getName() + "'", aggregationDefinition.getQueryContextStartIndex(), aggregationDefinition.getQueryContextEndIndex());
        }
        if (streamDefinitionMap.get(aggregationDefinition.getBasicSingleInputStream().getStreamId()) == null) {
            throw new SiddhiAppCreationException("Stream " + aggregationDefinition.getBasicSingleInputStream().getStreamId() + " has not been defined");
        }
        Element userDefinedPrimaryKey = AnnotationHelper.getAnnotationElement((String)"PrimaryKey", null, (List)aggregationDefinition.getAnnotations());
        if (userDefinedPrimaryKey != null) {
            throw new SiddhiAppCreationException("Aggregation Tables have predefined primary key, but found '" + userDefinedPrimaryKey.getValue() + "' primary key defined though annotation.");
        }
        try {
            ArrayList<VariableExpressionExecutor> incomingVariableExpressionExecutors = new ArrayList<VariableExpressionExecutor>();
            String aggregatorName = aggregationDefinition.getId();
            SiddhiQueryContext siddhiQueryContext = new SiddhiQueryContext(siddhiAppContext, aggregatorName);
            StreamRuntime streamRuntime = InputStreamParser.parse((InputStream)aggregationDefinition.getBasicSingleInputStream(), streamDefinitionMap, tableDefinitionMap, windowDefinitionMap, aggregationDefinitionMap, tableMap, windowMap, aggregationMap, incomingVariableExpressionExecutors, false, siddhiQueryContext);
            MetaStreamEvent incomingMetaStreamEvent = (MetaStreamEvent)streamRuntime.getMetaComplexEvent();
            incomingMetaStreamEvent.initializeAfterWindowData();
            List<TimePeriod.Duration> incrementalDurations = AggregationParser.getSortedPeriods(aggregationDefinition.getTimePeriod());
            ArrayList<ExpressionExecutor> incomingExpressionExecutors = new ArrayList<ExpressionExecutor>();
            ArrayList<IncrementalAttributeAggregator> incrementalAttributeAggregators = new ArrayList<IncrementalAttributeAggregator>();
            List groupByVariableList = aggregationDefinition.getSelector().getGroupByList();
            boolean isProcessingOnExternalTime = aggregationDefinition.getAggregateAttribute() != null;
            ArrayList<Expression> outputExpressions = new ArrayList<Expression>();
            ArrayList<ExpressionExecutor> outputExpressionExecutors = new ArrayList<ExpressionExecutor>();
            String shardId = null;
            Annotation partitionById = AnnotationHelper.getAnnotation((String)"PartitionById", (List)aggregationDefinition.getAnnotations());
            boolean enablePartioning = false;
            if (partitionById != null) {
                String enableElement = partitionById.getElement("enable");
                enablePartioning = enableElement == null || Boolean.parseBoolean(enableElement);
            }
            ConfigManager configManager = siddhiAppContext.getSiddhiContext().getConfigManager();
            Boolean shouldPartitionById = Boolean.parseBoolean(configManager.extractProperty("partitionById"));
            if (enablePartioning || shouldPartitionById.booleanValue()) {
                shardId = configManager.extractProperty("shardId");
                if (shardId == null) {
                    throw new SiddhiAppCreationException("Configuration 'shardId' not provided for @partitionbyid annotation");
                }
                enablePartioning = true;
            }
            boolean isLatestEventAdded = AggregationParser.populateIncomingAggregatorsAndExecutors(aggregationDefinition, tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, incrementalAttributeAggregators, groupByVariableList, outputExpressions, isProcessingOnExternalTime, shardId, siddhiQueryContext);
            int baseAggregatorBeginIndex = incomingMetaStreamEvent.getOutputData().size();
            List<Expression> finalBaseAggregators = AggregationParser.getFinalBaseAggregators(tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, incomingExpressionExecutors, incrementalAttributeAggregators, siddhiQueryContext);
            StreamDefinition incomingOutputStreamDefinition = StreamDefinition.id((String)"");
            incomingOutputStreamDefinition.setQueryContextStartIndex(aggregationDefinition.getQueryContextStartIndex());
            incomingOutputStreamDefinition.setQueryContextEndIndex(aggregationDefinition.getQueryContextEndIndex());
            MetaStreamEvent processedMetaStreamEvent = new MetaStreamEvent();
            for (Attribute attribute : incomingMetaStreamEvent.getOutputData()) {
                incomingOutputStreamDefinition.attribute(attribute.getName(), attribute.getType());
                processedMetaStreamEvent.addOutputData(attribute);
            }
            incomingMetaStreamEvent.setOutputDefinition(incomingOutputStreamDefinition);
            processedMetaStreamEvent.addInputDefinition((AbstractDefinition)incomingOutputStreamDefinition);
            processedMetaStreamEvent.setOutputDefinition(incomingOutputStreamDefinition);
            ArrayList<VariableExpressionExecutor> processVariableExpressionExecutors = new ArrayList<VariableExpressionExecutor>();
            boolean groupBy = aggregationDefinition.getSelector().getGroupByList().size() != 0;
            List<List<ExpressionExecutor>> processExpressionExecutorsList = incrementalDurations.stream().map(incrementalDuration -> AggregationParser.constructProcessExpressionExecutors(tableMap, siddhiQueryContext, baseAggregatorBeginIndex, finalBaseAggregators, incomingOutputStreamDefinition, processedMetaStreamEvent, processVariableExpressionExecutors, groupBy, isProcessingOnExternalTime, incrementalDuration)).collect(Collectors.toList());
            ExpressionExecutor shouldUpdateExpressionExecutor = null;
            if (isLatestEventAdded) {
                Expression shouldUpdateExp = AttributeFunction.function((String)"incrementalAggregator", (String)"shouldUpdate", (Expression[])new Expression[]{new Variable(AGG_LAST_TIMESTAMP_COL)});
                shouldUpdateExpressionExecutor = ExpressionParser.parseExpression(shouldUpdateExp, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            }
            outputExpressionExecutors.addAll(outputExpressions.stream().map(expression -> ExpressionParser.parseExpression(expression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, groupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext)).collect(Collectors.toList()));
            List<GroupByKeyGenerator> groupByKeyGeneratorList = incrementalDurations.stream().map(incrementalDuration -> {
                if (isProcessingOnExternalTime || groupBy) {
                    ArrayList<Expression> groupByExpressionList = new ArrayList<Expression>();
                    if (isProcessingOnExternalTime) {
                        Expression externalTimestampExpression = AttributeFunction.function((String)"incrementalAggregator", (String)"getAggregationStartTime", (Expression[])new Expression[]{new Variable(AGG_EXTERNAL_TIMESTAMP_COL), new StringConstant(incrementalDuration.name())});
                        groupByExpressionList.add(externalTimestampExpression);
                    }
                    groupByExpressionList.addAll(groupByVariableList.stream().map(groupByVariable -> groupByVariable).collect(Collectors.toList()));
                    return new GroupByKeyGenerator(groupByExpressionList, processedMetaStreamEvent, -1, tableMap, processVariableExpressionExecutors, siddhiQueryContext);
                }
                return null;
            }).collect(Collectors.toList());
            EntryValveExecutor entryValveExecutor = new EntryValveExecutor(siddhiAppContext);
            LockWrapper lockWrapper = new LockWrapper(aggregatorName);
            lockWrapper.setLock(new ReentrantLock());
            Scheduler scheduler = SchedulerParser.parse(entryValveExecutor, siddhiAppContext);
            scheduler.init(lockWrapper, aggregatorName);
            scheduler.setStreamEventPool(new StreamEventPool(processedMetaStreamEvent, 10));
            QueryParserHelper.reduceMetaComplexEvent(incomingMetaStreamEvent);
            QueryParserHelper.reduceMetaComplexEvent(processedMetaStreamEvent);
            QueryParserHelper.updateVariablePosition(incomingMetaStreamEvent, incomingVariableExpressionExecutors);
            QueryParserHelper.updateVariablePosition(processedMetaStreamEvent, processVariableExpressionExecutors);
            HashMap<TimePeriod.Duration, Table> aggregationTables = AggregationParser.initDefaultTables(aggregatorName, incrementalDurations, processedMetaStreamEvent.getOutputStreamDefinition(), siddhiAppRuntimeBuilder, aggregationDefinition.getAnnotations(), groupByVariableList, isProcessingOnExternalTime, enablePartioning);
            Element element = AnnotationHelper.getAnnotationElement((String)"BufferSize", null, (List)aggregationDefinition.getAnnotations());
            if (element != null) {
                LOG.info((Object)"@BufferSize annotation is depreciated. Out of order events are handled without buffers.");
            }
            if ((element = AnnotationHelper.getAnnotationElement((String)"IgnoreEventsOlderThanBuffer", null, (List)aggregationDefinition.getAnnotations())) != null) {
                LOG.info((Object)"@IgnoreEventsOlderThanBuffer annotation is depreciated. Out of order events are handled without buffers.");
            }
            Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap = AggregationParser.buildIncrementalExecutors(processedMetaStreamEvent, processExpressionExecutorsList, groupByKeyGeneratorList, incrementalDurations, aggregationTables, siddhiAppContext, aggregatorName, shouldUpdateExpressionExecutor);
            Map<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMapForPartitions = null;
            if (shardId != null) {
                incrementalExecutorMapForPartitions = AggregationParser.buildIncrementalExecutors(processedMetaStreamEvent, processExpressionExecutorsList, groupByKeyGeneratorList, incrementalDurations, aggregationTables, siddhiAppContext, aggregatorName, shouldUpdateExpressionExecutor);
            }
            IncrementalDataPurging incrementalDataPurging = new IncrementalDataPurging();
            incrementalDataPurging.init(aggregationDefinition, new StreamEventPool(processedMetaStreamEvent, 10), aggregationTables, isProcessingOnExternalTime, siddhiQueryContext);
            RecreateInMemoryData recreateInMemoryData = new RecreateInMemoryData(incrementalDurations, aggregationTables, incrementalExecutorMap, siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap, shardId, incrementalExecutorMapForPartitions);
            IncrementalExecutor rootIncrementalExecutor = incrementalExecutorMap.get(incrementalDurations.get(0));
            rootIncrementalExecutor.setScheduler(scheduler);
            entryValveExecutor.setNextExecutor(rootIncrementalExecutor);
            QueryParserHelper.initStreamRuntime(streamRuntime, incomingMetaStreamEvent, lockWrapper, aggregatorName);
            LatencyTracker latencyTrackerFind = null;
            LatencyTracker latencyTrackerInsert = null;
            ThroughputTracker throughputTrackerFind = null;
            ThroughputTracker throughputTrackerInsert = null;
            if (siddhiAppContext.getStatisticsManager() != null) {
                latencyTrackerFind = QueryParserHelper.createLatencyTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "find");
                latencyTrackerInsert = QueryParserHelper.createLatencyTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "insert");
                throughputTrackerFind = QueryParserHelper.createThroughputTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "find");
                throughputTrackerInsert = QueryParserHelper.createThroughputTracker(siddhiAppContext, aggregationDefinition.getId(), "Aggregations", "insert");
            }
            List<ExpressionExecutor> baseExecutors = AggregationParser.cloneExpressionExecutors(processExpressionExecutorsList.get(0));
            baseExecutors.remove(0);
            AggregationRuntime aggregationRuntime = new AggregationRuntime(aggregationDefinition, incrementalExecutorMap, aggregationTables, (SingleStreamRuntime)streamRuntime, incrementalDurations, siddhiAppContext, baseExecutors, processedMetaStreamEvent, outputExpressionExecutors, latencyTrackerFind, throughputTrackerFind, recreateInMemoryData, isProcessingOnExternalTime, processExpressionExecutorsList, groupByKeyGeneratorList, incrementalDataPurging, shouldUpdateExpressionExecutor, shardId, incrementalExecutorMapForPartitions);
            streamRuntime.setCommonProcessor(new IncrementalAggregationProcessor(aggregationRuntime, incomingExpressionExecutors, processedMetaStreamEvent, latencyTrackerInsert, throughputTrackerInsert, siddhiAppContext));
            return aggregationRuntime;
        }
        catch (Throwable t) {
            ExceptionUtil.populateQueryContext(t, (SiddhiElement)aggregationDefinition, siddhiAppContext);
            throw t;
        }
    }

    private static Map<TimePeriod.Duration, IncrementalExecutor> buildIncrementalExecutors(MetaStreamEvent processedMetaStreamEvent, List<List<ExpressionExecutor>> processExpressionExecutorsList, List<GroupByKeyGenerator> groupByKeyGeneratorList, List<TimePeriod.Duration> incrementalDurations, Map<TimePeriod.Duration, Table> aggregationTables, SiddhiAppContext siddhiAppContext, String aggregatorName, ExpressionExecutor shouldUpdateExpressionExecutor) {
        HashMap<TimePeriod.Duration, IncrementalExecutor> incrementalExecutorMap = new HashMap<TimePeriod.Duration, IncrementalExecutor>();
        IncrementalExecutor root = null;
        for (int i = incrementalDurations.size() - 1; i >= 0; --i) {
            boolean isRoot = false;
            if (i == 0) {
                isRoot = true;
            }
            IncrementalExecutor child = root;
            TimePeriod.Duration duration = incrementalDurations.get(i);
            ExpressionExecutor shouldUpdateExpressionExecutorClone = null;
            if (shouldUpdateExpressionExecutor != null) {
                shouldUpdateExpressionExecutorClone = shouldUpdateExpressionExecutor.cloneExecutor(null);
            }
            IncrementalExecutor incrementalExecutor = new IncrementalExecutor(duration, AggregationParser.cloneExpressionExecutors(processExpressionExecutorsList.get(i)), groupByKeyGeneratorList.get(i), processedMetaStreamEvent, child, isRoot, aggregationTables.get(duration), siddhiAppContext, aggregatorName, shouldUpdateExpressionExecutorClone);
            incrementalExecutorMap.put(duration, incrementalExecutor);
            root = incrementalExecutor;
        }
        return incrementalExecutorMap;
    }

    private static List<ExpressionExecutor> constructProcessExpressionExecutors(Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext, int baseAggregatorBeginIndex, List<Expression> finalBaseAggregators, StreamDefinition incomingOutputStreamDefinition, MetaStreamEvent processedMetaStreamEvent, List<VariableExpressionExecutor> processVariableExpressionExecutors, boolean groupBy, boolean isProcessingOnExternalTime, TimePeriod.Duration duration) {
        ArrayList<ExpressionExecutor> processExpressionExecutors = new ArrayList<ExpressionExecutor>();
        List attributeList = incomingOutputStreamDefinition.getAttributeList();
        for (int i = 0; i < baseAggregatorBeginIndex; ++i) {
            if (isProcessingOnExternalTime && i == 1) {
                Expression externalTimestampExpression = AttributeFunction.function((String)"incrementalAggregator", (String)"getAggregationStartTime", (Expression[])new Expression[]{new Variable(AGG_EXTERNAL_TIMESTAMP_COL), new StringConstant(duration.name())});
                ExpressionExecutor externalTimestampExecutor = ExpressionParser.parseExpression(externalTimestampExpression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, groupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
                processExpressionExecutors.add(externalTimestampExecutor);
                continue;
            }
            if (((Attribute)attributeList.get(i)).getName().equals(AGG_LAST_TIMESTAMP_COL)) {
                Expression lastTimestampExpression = AttributeFunction.function((String)"max", (Expression[])new Expression[]{new Variable(AGG_LAST_TIMESTAMP_COL)});
                ExpressionExecutor latestTimestampExecutor = ExpressionParser.parseExpression(lastTimestampExpression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, groupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
                processExpressionExecutors.add(latestTimestampExecutor);
                continue;
            }
            Attribute attribute = (Attribute)attributeList.get(i);
            VariableExpressionExecutor variableExpressionExecutor = (VariableExpressionExecutor)ExpressionParser.parseExpression((Expression)new Variable(attribute.getName()), processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, groupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(variableExpressionExecutor);
        }
        for (Expression expression : finalBaseAggregators) {
            ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression, processedMetaStreamEvent, 0, tableMap, processVariableExpressionExecutors, groupBy, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            processExpressionExecutors.add(expressionExecutor);
        }
        return processExpressionExecutors;
    }

    private static List<Expression> getFinalBaseAggregators(Map<String, Table> tableMap, List<VariableExpressionExecutor> incomingVariableExpressionExecutors, MetaStreamEvent incomingMetaStreamEvent, List<ExpressionExecutor> incomingExpressionExecutors, List<IncrementalAttributeAggregator> incrementalAttributeAggregators, SiddhiQueryContext siddhiQueryContext) {
        ArrayList<Attribute> finalBaseAttributes = new ArrayList<Attribute>();
        ArrayList<Expression> finalBaseAggregators = new ArrayList<Expression>();
        for (IncrementalAttributeAggregator incrementalAttributeAggregator : incrementalAttributeAggregators) {
            Attribute[] baseAttributes = incrementalAttributeAggregator.getBaseAttributes();
            Expression[] baseAttributeInitialValues = incrementalAttributeAggregator.getBaseAttributeInitialValues();
            Expression[] baseAggregators = incrementalAttributeAggregator.getBaseAggregators();
            for (int i = 0; i < baseAttributes.length; ++i) {
                AggregationParser.validateBaseAggregators(incrementalAttributeAggregators, incrementalAttributeAggregator, baseAttributes, baseAttributeInitialValues, baseAggregators, i);
                if (finalBaseAttributes.contains(baseAttributes[i])) continue;
                finalBaseAttributes.add(baseAttributes[i]);
                finalBaseAggregators.add(baseAggregators[i]);
                incomingMetaStreamEvent.addOutputData(baseAttributes[i]);
                incomingExpressionExecutors.add(ExpressionParser.parseExpression(baseAttributeInitialValues[i], incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext));
            }
        }
        return finalBaseAggregators;
    }

    private static boolean populateIncomingAggregatorsAndExecutors(AggregationDefinition aggregationDefinition, Map<String, Table> tableMap, List<VariableExpressionExecutor> incomingVariableExpressionExecutors, MetaStreamEvent incomingMetaStreamEvent, List<ExpressionExecutor> incomingExpressionExecutors, List<IncrementalAttributeAggregator> incrementalAttributeAggregators, List<Variable> groupByVariableList, List<Expression> outputExpressions, boolean isProcessingOnExternalTime, String shardId, SiddhiQueryContext siddhiQueryContext) {
        boolean addAggLastEvent = false;
        ExpressionExecutor timestampExecutor = AggregationParser.getTimeStampExecutor(tableMap, incomingVariableExpressionExecutors, incomingMetaStreamEvent, siddhiQueryContext);
        Attribute timestampAttribute = new Attribute(AGG_START_TIMESTAMP_COL, Attribute.Type.LONG);
        incomingMetaStreamEvent.addOutputData(timestampAttribute);
        incomingExpressionExecutors.add(timestampExecutor);
        Attribute externalTimestampAttribute = new Attribute(AGG_EXTERNAL_TIMESTAMP_COL, Attribute.Type.LONG);
        ExpressionExecutor externalTimestampExecutor = null;
        if (isProcessingOnExternalTime) {
            Variable externalTimestampExpression = aggregationDefinition.getAggregateAttribute();
            externalTimestampExecutor = ExpressionParser.parseExpression((Expression)externalTimestampExpression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            if (externalTimestampExecutor.getReturnType() == Attribute.Type.STRING) {
                Iterator<Object> expression = AttributeFunction.function((String)"incrementalAggregator", (String)"timestampInMilliseconds", (Expression[])new Expression[]{externalTimestampExpression});
                externalTimestampExecutor = ExpressionParser.parseExpression(expression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            } else if (externalTimestampExecutor.getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("AggregationDefinition '" + aggregationDefinition.getId() + "'s aggregateAttribute expects long or string, but found " + timestampExecutor.getReturnType() + ". Hence, can't create the siddhi app '" + siddhiQueryContext.getSiddhiAppContext().getName() + "'", externalTimestampExpression.getQueryContextStartIndex(), externalTimestampExpression.getQueryContextEndIndex());
            }
            incomingMetaStreamEvent.addOutputData(externalTimestampAttribute);
            incomingExpressionExecutors.add(externalTimestampExecutor);
        }
        AbstractDefinition incomingLastInputStreamDefinition = incomingMetaStreamEvent.getLastInputDefinition();
        for (Variable groupByVariable : groupByVariableList) {
            incomingMetaStreamEvent.addOutputData((Attribute)incomingLastInputStreamDefinition.getAttributeList().get(incomingLastInputStreamDefinition.getAttributePosition(groupByVariable.getAttributeName())));
            incomingExpressionExecutors.add(ExpressionParser.parseExpression((Expression)groupByVariable, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext));
        }
        aggregationDefinition.getAttributeList().add(timestampAttribute);
        if (isProcessingOnExternalTime) {
            outputExpressions.add((Expression)Expression.variable((String)AGG_EXTERNAL_TIMESTAMP_COL));
        } else {
            outputExpressions.add((Expression)Expression.variable((String)AGG_START_TIMESTAMP_COL));
        }
        for (OutputAttribute outputAttribute : aggregationDefinition.getSelector().getSelectionList()) {
            Expression expression = outputAttribute.getExpression();
            if (expression instanceof AttributeFunction) {
                IncrementalAttributeAggregator incrementalAggregator = null;
                try {
                    incrementalAggregator = (IncrementalAttributeAggregator)SiddhiClassLoader.loadExtensionImplementation((Extension)new AttributeFunction("incrementalAggregator", ((AttributeFunction)expression).getName(), ((AttributeFunction)expression).getParameters()), IncrementalAttributeAggregatorExtensionHolder.getInstance(siddhiQueryContext.getSiddhiAppContext()));
                }
                catch (SiddhiAppCreationException ex) {
                    try {
                        SiddhiClassLoader.loadExtensionImplementation((Extension)((AttributeFunction)expression), FunctionExecutorExtensionHolder.getInstance(siddhiQueryContext.getSiddhiAppContext()));
                        ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
                        incomingExpressionExecutors.add(expressionExecutor);
                        incomingMetaStreamEvent.addOutputData(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
                        aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
                        outputExpressions.add((Expression)Expression.variable((String)outputAttribute.getRename()));
                    }
                    catch (SiddhiAppCreationException e) {
                        throw new SiddhiAppCreationException("'" + ((AttributeFunction)expression).getName() + "' is neither a incremental attribute aggregator extension or a function extension", expression.getQueryContextStartIndex(), expression.getQueryContextEndIndex());
                    }
                }
                if (incrementalAggregator == null) continue;
                AggregationParser.initIncrementalAttributeAggregator(incomingLastInputStreamDefinition, (AttributeFunction)expression, incrementalAggregator);
                incrementalAttributeAggregators.add(incrementalAggregator);
                aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), incrementalAggregator.getReturnType()));
                outputExpressions.add(incrementalAggregator.aggregate());
                continue;
            }
            if (expression instanceof Variable && groupByVariableList.contains(expression)) {
                Attribute groupByAttribute = null;
                for (Attribute attribute : incomingMetaStreamEvent.getOutputData()) {
                    if (!attribute.getName().equals(((Variable)expression).getAttributeName())) continue;
                    groupByAttribute = attribute;
                    break;
                }
                if (groupByAttribute == null) {
                    throw new SiddhiAppCreationException("Expected GroupBy attribute '" + ((Variable)expression).getAttributeName() + "' not used in aggregation '" + siddhiQueryContext.getName() + "' processing.", expression.getQueryContextStartIndex(), expression.getQueryContextEndIndex());
                }
                aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), groupByAttribute.getType()));
                outputExpressions.add((Expression)Expression.variable((String)groupByAttribute.getName()));
                continue;
            }
            if (isProcessingOnExternalTime && !addAggLastEvent) {
                Attribute lastEventTimeStamp = new Attribute(AGG_LAST_TIMESTAMP_COL, Attribute.Type.LONG);
                incomingMetaStreamEvent.addOutputData(lastEventTimeStamp);
                incomingExpressionExecutors.add(externalTimestampExecutor);
                addAggLastEvent = true;
            }
            ExpressionExecutor expressionExecutor = ExpressionParser.parseExpression(expression, incomingMetaStreamEvent, 0, tableMap, incomingVariableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
            incomingExpressionExecutors.add(expressionExecutor);
            incomingMetaStreamEvent.addOutputData(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
            aggregationDefinition.getAttributeList().add(new Attribute(outputAttribute.getRename(), expressionExecutor.getReturnType()));
            outputExpressions.add((Expression)Expression.variable((String)outputAttribute.getRename()));
        }
        if (shardId != null) {
            ConstantExpressionExecutor nodeIdExpressionExecutor = new ConstantExpressionExecutor(shardId, Attribute.Type.STRING);
            incomingExpressionExecutors.add(nodeIdExpressionExecutor);
            incomingMetaStreamEvent.addOutputData(new Attribute(SHARD_ID_COL, Attribute.Type.STRING));
        }
        return addAggLastEvent;
    }

    private static List<ExpressionExecutor> cloneExpressionExecutors(List<ExpressionExecutor> expressionExecutors) {
        List<ExpressionExecutor> arrayList = expressionExecutors.stream().map(expressionExecutor -> expressionExecutor.cloneExecutor(null)).collect(Collectors.toList());
        return arrayList;
    }

    private static void validateBaseAggregators(List<IncrementalAttributeAggregator> incrementalAttributeAggregators, IncrementalAttributeAggregator incrementalAttributeAggregator, Attribute[] baseAttributes, Expression[] baseAttributeInitialValues, Expression[] baseAggregators, int i) {
        for (int i1 = i; i1 < incrementalAttributeAggregators.size(); ++i1) {
            IncrementalAttributeAggregator otherAttributeAggregator = incrementalAttributeAggregators.get(i1);
            if (otherAttributeAggregator == incrementalAttributeAggregator) continue;
            Attribute[] otherBaseAttributes = otherAttributeAggregator.getBaseAttributes();
            Expression[] otherBaseAttributeInitialValues = otherAttributeAggregator.getBaseAttributeInitialValues();
            Expression[] otherBaseAggregators = otherAttributeAggregator.getBaseAggregators();
            for (int j = 0; j < otherBaseAttributes.length; ++j) {
                if (!baseAttributes[i].equals((Object)otherBaseAttributes[j])) continue;
                if (!baseAttributeInitialValues[i].equals(otherBaseAttributeInitialValues[j])) {
                    throw new SiddhiAppCreationException("BaseAttributes having same name should be defined with same initial values, but baseAttribute '" + baseAttributes[i] + "' is defined in '" + incrementalAttributeAggregator.getClass().getName() + "' and '" + otherAttributeAggregator.getClass().getName() + "' with different initial values.");
                }
                if (baseAggregators[i].equals(otherBaseAggregators[j])) continue;
                throw new SiddhiAppCreationException("BaseAttributes having same name should be defined with same baseAggregators, but baseAttribute '" + baseAttributes[i] + "' is defined in '" + incrementalAttributeAggregator.getClass().getName() + "' and '" + otherAttributeAggregator.getClass().getName() + "' with different baseAggregators.");
            }
        }
    }

    private static void initIncrementalAttributeAggregator(AbstractDefinition lastInputStreamDefinition, AttributeFunction attributeFunction, IncrementalAttributeAggregator incrementalAttributeAggregator) {
        String attributeName = null;
        Attribute.Type attributeType = null;
        if (attributeFunction.getParameters() != null && attributeFunction.getParameters()[0] != null) {
            if (attributeFunction.getParameters().length != 1) {
                throw new SiddhiAppCreationException("Incremental aggregator requires only on one parameter. Found " + attributeFunction.getParameters().length, attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
            }
            if (!(attributeFunction.getParameters()[0] instanceof Variable)) {
                throw new SiddhiAppCreationException("Incremental aggregator expected a variable. However a parameter of type " + attributeFunction.getParameters()[0].getClass().getTypeName() + " was found", attributeFunction.getParameters()[0].getQueryContextStartIndex(), attributeFunction.getParameters()[0].getQueryContextEndIndex());
            }
            attributeName = ((Variable)attributeFunction.getParameters()[0]).getAttributeName();
            attributeType = lastInputStreamDefinition.getAttributeType(attributeName);
        }
        incrementalAttributeAggregator.init(attributeName, attributeType);
        Attribute[] baseAttributes = incrementalAttributeAggregator.getBaseAttributes();
        Expression[] baseAttributeInitialValues = incrementalAttributeAggregator.getBaseAttributeInitialValues();
        Expression[] baseAggregators = incrementalAttributeAggregator.getBaseAggregators();
        if (baseAttributes.length != baseAggregators.length) {
            throw new SiddhiAppCreationException("Number of baseAggregators '" + baseAggregators.length + "' and baseAttributes '" + baseAttributes.length + "' is not equal for '" + attributeFunction + "'", attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
        }
        if (baseAttributeInitialValues.length != baseAggregators.length) {
            throw new SiddhiAppCreationException("Number of baseAggregators '" + baseAggregators.length + "' and baseAttributeInitialValues '" + baseAttributeInitialValues.length + "' is not equal for '" + attributeFunction + "'", attributeFunction.getQueryContextStartIndex(), attributeFunction.getQueryContextEndIndex());
        }
    }

    private static ExpressionExecutor getTimeStampExecutor(Map<String, Table> tableMap, List<VariableExpressionExecutor> variableExpressionExecutors, MetaStreamEvent metaStreamEvent, SiddhiQueryContext siddhiQueryContext) {
        Expression timestampExpression = AttributeFunction.function((String)"currentTimeMillis", null);
        ExpressionExecutor timestampExecutor = ExpressionParser.parseExpression(timestampExpression, metaStreamEvent, 0, tableMap, variableExpressionExecutors, false, 0, ProcessingMode.BATCH, false, siddhiQueryContext);
        return timestampExecutor;
    }

    private static boolean isRange(TimePeriod timePeriod) {
        return timePeriod.getOperator() == TimePeriod.Operator.RANGE;
    }

    private static List<TimePeriod.Duration> getSortedPeriods(TimePeriod timePeriod) {
        try {
            List<TimePeriod.Duration> durations = timePeriod.getDurations();
            if (AggregationParser.isRange(timePeriod)) {
                durations = AggregationParser.fillGap((TimePeriod.Duration)durations.get(0), (TimePeriod.Duration)durations.get(1));
            }
            return AggregationParser.sortedDurations(durations);
        }
        catch (Throwable t) {
            ExceptionUtil.populateQueryContext(t, (SiddhiElement)timePeriod, null);
            throw t;
        }
    }

    private static List<TimePeriod.Duration> sortedDurations(List<TimePeriod.Duration> durations) {
        ArrayList<TimePeriod.Duration> copyDurations = new ArrayList<TimePeriod.Duration>(durations);
        Comparator<TimePeriod.Duration> periodComparator = new Comparator<TimePeriod.Duration>(){

            @Override
            public int compare(TimePeriod.Duration firstDuration, TimePeriod.Duration secondDuration) {
                int secondOrdinal;
                int firstOrdinal = firstDuration.ordinal();
                if (firstOrdinal > (secondOrdinal = secondDuration.ordinal())) {
                    return 1;
                }
                if (firstOrdinal < secondOrdinal) {
                    return -1;
                }
                return 0;
            }
        };
        copyDurations.sort(periodComparator);
        return copyDurations;
    }

    private static List<TimePeriod.Duration> fillGap(TimePeriod.Duration start, TimePeriod.Duration end) {
        int endIndex;
        TimePeriod.Duration[] durations = TimePeriod.Duration.values();
        List<TimePeriod.Duration> filledDurations = new ArrayList<TimePeriod.Duration>();
        int startIndex = start.ordinal();
        if (startIndex > (endIndex = end.ordinal())) {
            throw new SiddhiAppCreationException("Start time period must be less than end time period for range aggregation calculation");
        }
        if (startIndex == endIndex) {
            filledDurations.add(start);
        } else {
            TimePeriod.Duration[] temp = new TimePeriod.Duration[endIndex - startIndex + 1];
            System.arraycopy(durations, startIndex, temp, 0, endIndex - startIndex + 1);
            filledDurations = Arrays.asList(temp);
        }
        return filledDurations;
    }

    private static HashMap<TimePeriod.Duration, Table> initDefaultTables(String aggregatorName, List<TimePeriod.Duration> durations, StreamDefinition streamDefinition, SiddhiAppRuntimeBuilder siddhiAppRuntimeBuilder, List<Annotation> annotations, List<Variable> groupByVariableList, boolean isProcessingOnExternalTime, boolean enablePartioning) {
        HashMap<TimePeriod.Duration, Table> aggregationTableMap = new HashMap<TimePeriod.Duration, Table>();
        Annotation primaryKeyAnnotation = new Annotation("PrimaryKey");
        primaryKeyAnnotation.element(null, AGG_START_TIMESTAMP_COL);
        if (enablePartioning) {
            primaryKeyAnnotation.element(null, SHARD_ID_COL);
        }
        if (isProcessingOnExternalTime) {
            primaryKeyAnnotation.element(null, AGG_EXTERNAL_TIMESTAMP_COL);
        }
        for (Variable groupByVariable : groupByVariableList) {
            primaryKeyAnnotation.element(null, groupByVariable.getAttributeName());
        }
        annotations.add(primaryKeyAnnotation);
        for (TimePeriod.Duration duration : durations) {
            String tableId = aggregatorName + "_" + duration.toString();
            TableDefinition tableDefinition = TableDefinition.id((String)tableId);
            for (Attribute attribute : streamDefinition.getAttributeList()) {
                tableDefinition.attribute(attribute.getName(), attribute.getType());
            }
            annotations.forEach(arg_0 -> ((TableDefinition)tableDefinition).annotation(arg_0));
            siddhiAppRuntimeBuilder.defineTable(tableDefinition);
            aggregationTableMap.put(duration, (Table)siddhiAppRuntimeBuilder.getTableMap().get(tableId));
        }
        return aggregationTableMap;
    }
}

