/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.plan.planner.distribution;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;

public class SourceRewriter
extends SimplePlanNodeRewriter<DistributionPlanContext> {
    private Analysis analysis;

    public SourceRewriter(Analysis analysis) {
        this.analysis = analysis;
    }

    @Override
    public PlanNode visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
        Preconditions.checkArgument((node.getDevices().size() == node.getChildren().size() ? 1 : 0) != 0, (Object)"size of devices and its children in DeviceViewNode should be same");
        HashSet<TRegionReplicaSet> relatedDataRegions = new HashSet<TRegionReplicaSet>();
        ArrayList<DeviceViewSplit> deviceViewSplits = new ArrayList<DeviceViewSplit>();
        for (int i = 0; i < node.getDevices().size(); ++i) {
            String device = node.getDevices().get(i);
            PlanNode child = node.getChildren().get(i);
            List<TRegionReplicaSet> regionReplicaSets = this.analysis.getPartitionInfo(device, this.analysis.getGlobalTimeFilter());
            deviceViewSplits.add(new DeviceViewSplit(device, child, regionReplicaSets));
            relatedDataRegions.addAll(regionReplicaSets);
        }
        DeviceMergeNode deviceMergeNode = new DeviceMergeNode(context.queryContext.getQueryId().genPlanNodeId(), node.getMergeOrderParameter(), node.getDevices());
        for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
            ArrayList<String> devices = new ArrayList<String>();
            ArrayList<PlanNode> children = new ArrayList<PlanNode>();
            for (DeviceViewSplit split : deviceViewSplits) {
                if (!split.needDistributeTo(regionReplicaSet)) continue;
                devices.add(split.device);
                children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext));
            }
            DeviceViewNode regionDeviceViewNode = new DeviceViewNode(context.queryContext.getQueryId().genPlanNodeId(), node.getMergeOrderParameter(), node.getOutputColumnNames(), node.getDeviceToMeasurementIndexesMap());
            for (int i = 0; i < devices.size(); ++i) {
                regionDeviceViewNode.addChildDeviceNode((String)devices.get(i), (PlanNode)children.get(i));
            }
            deviceMergeNode.addChild(regionDeviceViewNode);
        }
        return deviceMergeNode;
    }

    @Override
    public PlanNode visitSchemaQueryMerge(SchemaQueryMergeNode node, DistributionPlanContext context) {
        SchemaQueryMergeNode root = (SchemaQueryMergeNode)node.clone();
        SchemaQueryScanNode seed = (SchemaQueryScanNode)node.getChildren().get(0);
        TreeSet<TRegionReplicaSet> schemaRegions = new TreeSet<TRegionReplicaSet>(Comparator.comparingInt(region -> region.getRegionId().getId()));
        this.analysis.getSchemaPartitionInfo().getSchemaPartitionMap().forEach((storageGroup, deviceGroup) -> deviceGroup.forEach((deviceGroupId, schemaRegionReplicaSet) -> schemaRegions.add((TRegionReplicaSet)schemaRegionReplicaSet)));
        schemaRegions.forEach(region -> {
            SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)seed.clone();
            schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            schemaQueryScanNode.setRegionReplicaSet((TRegionReplicaSet)region);
            root.addChild(schemaQueryScanNode);
        });
        return root;
    }

    @Override
    public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
        CountSchemaMergeNode root = (CountSchemaMergeNode)node.clone();
        SchemaQueryScanNode seed = (SchemaQueryScanNode)node.getChildren().get(0);
        HashSet schemaRegions = new HashSet();
        this.analysis.getSchemaPartitionInfo().getSchemaPartitionMap().forEach((storageGroup, deviceGroup) -> deviceGroup.forEach((deviceGroupId, schemaRegionReplicaSet) -> schemaRegions.add(schemaRegionReplicaSet)));
        schemaRegions.forEach(region -> {
            SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)seed.clone();
            schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            schemaQueryScanNode.setRegionReplicaSet((TRegionReplicaSet)region);
            root.addChild(schemaQueryScanNode);
        });
        return root;
    }

    @Override
    public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
        TimeJoinNode timeJoinNode = new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
        return this.processRawSeriesScan(node, context, timeJoinNode);
    }

    @Override
    public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, DistributionPlanContext context) {
        TimeJoinNode timeJoinNode = new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
        return this.processRawSeriesScan(node, context, timeJoinNode);
    }

    @Override
    public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) {
        LastQueryNode mergeNode = new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), new OrderByParameter());
        return this.processRawSeriesScan(node, context, mergeNode);
    }

    @Override
    public PlanNode visitAlignedLastQueryScan(AlignedLastQueryScanNode node, DistributionPlanContext context) {
        LastQueryNode mergeNode = new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), new OrderByParameter());
        return this.processRawSeriesScan(node, context, mergeNode);
    }

    private PlanNode processRawSeriesScan(SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
        List<SeriesSourceNode> sourceNodes = this.splitSeriesSourceNodeByPartition(node, context);
        if (sourceNodes.size() == 1) {
            return sourceNodes.get(0);
        }
        sourceNodes.forEach(parent::addChild);
        return parent;
    }

    private List<SeriesSourceNode> splitSeriesSourceNodeByPartition(SeriesSourceNode node, DistributionPlanContext context) {
        ArrayList<SeriesSourceNode> ret = new ArrayList<SeriesSourceNode>();
        List<TRegionReplicaSet> dataDistribution = this.analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
        if (dataDistribution.size() == 1) {
            node.setRegionReplicaSet(dataDistribution.get(0));
            ret.add(node);
            return ret;
        }
        for (TRegionReplicaSet dataRegion : dataDistribution) {
            SeriesSourceNode split = (SeriesSourceNode)node.clone();
            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            split.setRegionReplicaSet(dataRegion);
            ret.add(split);
        }
        return ret;
    }

    @Override
    public PlanNode visitSeriesAggregationScan(SeriesAggregationScanNode node, DistributionPlanContext context) {
        return this.processSeriesAggregationSource(node, context);
    }

    @Override
    public PlanNode visitAlignedSeriesAggregationScan(AlignedSeriesAggregationScanNode node, DistributionPlanContext context) {
        return this.processSeriesAggregationSource(node, context);
    }

    private PlanNode processSeriesAggregationSource(SeriesAggregationSourceNode node, DistributionPlanContext context) {
        List<TRegionReplicaSet> dataDistribution = this.analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
        if (dataDistribution.size() == 1) {
            node.setRegionReplicaSet(dataDistribution.get(0));
            return node;
        }
        ArrayList<AggregationDescriptor> leafAggDescriptorList = new ArrayList<AggregationDescriptor>();
        node.getAggregationDescriptorList().forEach(descriptor -> leafAggDescriptorList.add(new AggregationDescriptor(descriptor.getAggregationFuncName(), AggregationStep.PARTIAL, descriptor.getInputExpressions())));
        leafAggDescriptorList.forEach(d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation(d, this.analysis.getTypeProvider()));
        ArrayList<AggregationDescriptor> rootAggDescriptorList = new ArrayList<AggregationDescriptor>();
        node.getAggregationDescriptorList().forEach(descriptor -> rootAggDescriptorList.add(new AggregationDescriptor(descriptor.getAggregationFuncName(), AggregationStep.FINAL, descriptor.getInputExpressions())));
        AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList, node.getGroupByTimeParameter(), node.getScanOrder());
        for (TRegionReplicaSet dataRegion : dataDistribution) {
            SeriesAggregationScanNode split = (SeriesAggregationScanNode)node.clone();
            split.setAggregationDescriptorList(leafAggDescriptorList);
            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            split.setRegionReplicaSet(dataRegion);
            aggregationNode.addChild(split);
        }
        return aggregationNode;
    }

    @Override
    public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, DistributionPlanContext context) {
        SchemaFetchMergeNode root = (SchemaFetchMergeNode)node.clone();
        HashMap storageGroupSchemaRegionMap = new HashMap();
        this.analysis.getSchemaPartitionInfo().getSchemaPartitionMap().forEach((storageGroup, deviceGroup) -> {
            storageGroupSchemaRegionMap.put(storageGroup, new HashSet());
            deviceGroup.forEach((deviceGroupId, schemaRegionReplicaSet) -> ((Set)storageGroupSchemaRegionMap.get(storageGroup)).add(schemaRegionReplicaSet));
        });
        for (PlanNode child : node.getChildren()) {
            for (TRegionReplicaSet schemaRegion : (Set)storageGroupSchemaRegionMap.get(((SchemaFetchScanNode)child).getStorageGroup().getFullPath())) {
                SchemaFetchScanNode schemaFetchScanNode = (SchemaFetchScanNode)child.clone();
                schemaFetchScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                schemaFetchScanNode.setRegionReplicaSet(schemaRegion);
                root.addChild(schemaFetchScanNode);
            }
        }
        return root;
    }

    @Override
    public PlanNode visitLastQuery(LastQueryNode node, DistributionPlanContext context) {
        context.setForceAddParent(true);
        PlanNode root = this.processRawMultiChildNode(node, context);
        if (context.queryMultiRegion) {
            PlanNode newRoot = this.genLastQueryRootNode(node, context);
            root.getChildren().forEach(newRoot::addChild);
            return newRoot;
        }
        return root;
    }

    private PlanNode genLastQueryRootNode(LastQueryNode node, DistributionPlanContext context) {
        PlanNodeId id = context.queryContext.getQueryId().genPlanNodeId();
        if (context.oneSeriesInMultiRegion || !node.getMergeOrderParameter().isEmpty()) {
            return new LastQueryMergeNode(id, node.getMergeOrderParameter());
        }
        return new LastQueryCollectNode(id);
    }

    @Override
    public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
        if (this.isAggregationQuery(node)) {
            return this.planAggregationWithTimeJoin(node, context);
        }
        return this.processRawMultiChildNode(node, context);
    }

    private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
        MultiChildNode root = (MultiChildNode)node.clone();
        ArrayList<SeriesSourceNode> sources = new ArrayList<SeriesSourceNode>();
        for (PlanNode child : node.getChildren()) {
            if (!(child instanceof SeriesSourceNode)) continue;
            SeriesSourceNode handle = (SeriesSourceNode)child;
            List<TRegionReplicaSet> dataDistribution = this.analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
            if (dataDistribution.size() > 1) {
                context.setOneSeriesInMultiRegion(true);
            }
            for (TRegionReplicaSet dataRegion2 : dataDistribution) {
                SeriesSourceNode split = (SeriesSourceNode)handle.clone();
                split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                split.setRegionReplicaSet(dataRegion2);
                sources.add(split);
            }
        }
        Map<TRegionReplicaSet, List<SourceNode>> sourceGroup = sources.stream().collect(Collectors.groupingBy(IPartitionRelatedNode::getRegionReplicaSet));
        if (sourceGroup.size() > 1) {
            context.setQueryMultiRegion(true);
        }
        boolean[] addParent = new boolean[]{false};
        sourceGroup.forEach((dataRegion, seriesScanNodes) -> {
            if (seriesScanNodes.size() == 1 && !context.forceAddParent) {
                root.addChild((PlanNode)seriesScanNodes.get(0));
            } else {
                boolean appendToRootDirectly;
                boolean bl = appendToRootDirectly = sourceGroup.size() == 1 || !addParent[0] && !context.forceAddParent;
                if (appendToRootDirectly) {
                    seriesScanNodes.forEach(root::addChild);
                    addParent[0] = true;
                } else {
                    MultiChildNode parentOfGroup = (MultiChildNode)root.clone();
                    parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                    seriesScanNodes.forEach(parentOfGroup::addChild);
                    root.addChild(parentOfGroup);
                }
            }
        });
        for (PlanNode child : node.getChildren()) {
            if (child instanceof SeriesSourceNode) continue;
            root.addChild(this.visit(child, context));
        }
        return root;
    }

    private boolean isAggregationQuery(TimeJoinNode node) {
        for (PlanNode child : node.getChildren()) {
            if (!(child instanceof SeriesAggregationScanNode) && !(child instanceof AlignedSeriesAggregationScanNode)) continue;
            return true;
        }
        return false;
    }

    @Override
    public PlanNode visitSlidingWindowAggregation(SlidingWindowAggregationNode node, DistributionPlanContext context) {
        DistributionPlanContext childContext = context.copy().setRoot(false);
        PlanNode child = this.visit(node.getChild(), childContext);
        PlanNode newRoot = node.clone();
        newRoot.addChild(child);
        return newRoot;
    }

    private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
        List<SeriesAggregationSourceNode> sources = this.splitAggregationSourceByPartition(root, context);
        Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup = sources.stream().collect(Collectors.groupingBy(IPartitionRelatedNode::getRegionReplicaSet));
        ArrayList<AggregationDescriptor> rootAggDescriptorList = new ArrayList<AggregationDescriptor>();
        for (PlanNode child : root.getChildren()) {
            SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode)child;
            handle.getAggregationDescriptorList().forEach(descriptor -> rootAggDescriptorList.add(new AggregationDescriptor(descriptor.getAggregationFuncName(), context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, descriptor.getInputExpressions())));
        }
        rootAggDescriptorList.forEach(d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation(d, this.analysis.getTypeProvider()));
        Preconditions.checkArgument((sources.size() > 0 ? 1 : 0) != 0, (Object)"Aggregation sources should not be empty when distribution planning");
        SeriesAggregationSourceNode seed = sources.get(0);
        AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList, seed.getGroupByTimeParameter(), seed.getScanOrder());
        boolean[] addParent = new boolean[]{false};
        sourceGroup.forEach((dataRegion, sourceNodes) -> {
            if (sourceNodes.size() == 1) {
                aggregationNode.addChild((PlanNode)sourceNodes.get(0));
            } else if (!addParent[0]) {
                sourceNodes.forEach(aggregationNode::addChild);
                addParent[0] = true;
            } else {
                TimeJoinNode parentOfGroup = (TimeJoinNode)root.clone();
                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                sourceNodes.forEach(parentOfGroup::addChild);
                aggregationNode.addChild(parentOfGroup);
            }
        });
        return aggregationNode;
    }

    @Override
    public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
        if (this.shouldUseNaiveAggregation(root)) {
            return this.defaultRewrite(root, context);
        }
        List<SeriesAggregationSourceNode> sources = this.splitAggregationSourceByPartition(root, context);
        Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup = sources.stream().collect(Collectors.groupingBy(IPartitionRelatedNode::getRegionReplicaSet));
        boolean containsSlidingWindow = root.getChildren().size() == 1 && root.getChildren().get(0) instanceof SlidingWindowAggregationNode;
        GroupByLevelNode newRoot = containsSlidingWindow ? this.groupSourcesForGroupByLevelWithSlidingWindow(root, (SlidingWindowAggregationNode)root.getChildren().get(0), sourceGroup, context) : this.groupSourcesForGroupByLevel(root, sourceGroup, context);
        this.calculateGroupByLevelNodeAttributes(newRoot, 0);
        return newRoot;
    }

    private boolean shouldUseNaiveAggregation(PlanNode root) {
        if (root instanceof AggregationNode) {
            return true;
        }
        for (PlanNode child : root.getChildren()) {
            if (!this.shouldUseNaiveAggregation(child)) continue;
            return true;
        }
        return false;
    }

    private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow(GroupByLevelNode root, SlidingWindowAggregationNode slidingWindowNode, Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup, DistributionPlanContext context) {
        GroupByLevelNode newRoot = (GroupByLevelNode)root.clone();
        ArrayList groups = new ArrayList();
        sourceGroup.forEach((dataRegion, sourceNodes) -> {
            SlidingWindowAggregationNode parentOfGroup = (SlidingWindowAggregationNode)slidingWindowNode.clone();
            parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            if (sourceNodes.size() == 1) {
                parentOfGroup.addChild((PlanNode)sourceNodes.get(0));
            } else {
                TimeJoinNode timeJoinNode = new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
                sourceNodes.forEach(timeJoinNode::addChild);
                parentOfGroup.addChild(timeJoinNode);
            }
            groups.add(parentOfGroup);
        });
        for (int i = 0; i < groups.size(); ++i) {
            if (i == 0) {
                newRoot.addChild((PlanNode)groups.get(i));
                continue;
            }
            GroupByLevelNode parent = (GroupByLevelNode)root.clone();
            parent.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
            parent.addChild((PlanNode)groups.get(i));
            newRoot.addChild(parent);
        }
        return newRoot;
    }

    private GroupByLevelNode groupSourcesForGroupByLevel(GroupByLevelNode root, Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup, DistributionPlanContext context) {
        GroupByLevelNode newRoot = (GroupByLevelNode)root.clone();
        boolean[] addParent = new boolean[]{false};
        sourceGroup.forEach((dataRegion, sourceNodes) -> {
            if (sourceNodes.size() == 1) {
                newRoot.addChild((PlanNode)sourceNodes.get(0));
            } else if (!addParent[0]) {
                sourceNodes.forEach(newRoot::addChild);
                addParent[0] = true;
            } else {
                GroupByLevelNode parentOfGroup = (GroupByLevelNode)root.clone();
                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                sourceNodes.forEach(parentOfGroup::addChild);
                newRoot.addChild(parentOfGroup);
            }
        });
        return newRoot;
    }

    private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
        ArrayList<AggregationDescriptor> descriptorList;
        ProcessNode handle;
        if (node == null) {
            return;
        }
        node.getChildren().forEach(child -> this.calculateGroupByLevelNodeAttributes((PlanNode)child, level + 1));
        HashSet childrenOutputColumns = new HashSet();
        node.getChildren().forEach(child -> childrenOutputColumns.addAll(child.getOutputColumnNames()));
        if (node instanceof SlidingWindowAggregationNode) {
            handle = (SlidingWindowAggregationNode)node;
            descriptorList = new ArrayList<AggregationDescriptor>();
            for (AggregationDescriptor aggregationDescriptor : ((SlidingWindowAggregationNode)handle).getAggregationDescriptorList()) {
                boolean keep = false;
                for (String childColumn : childrenOutputColumns) {
                    for (Expression exp : aggregationDescriptor.getInputExpressions()) {
                        if (!this.isAggColumnMatchExpression(childColumn, exp)) continue;
                        keep = true;
                    }
                }
                if (!keep) continue;
                descriptorList.add(aggregationDescriptor);
                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(aggregationDescriptor, this.analysis.getTypeProvider());
            }
            ((SlidingWindowAggregationNode)handle).setAggregationDescriptorList(descriptorList);
        }
        if (node instanceof GroupByLevelNode) {
            handle = (GroupByLevelNode)node;
            descriptorList = new ArrayList();
            for (GroupByLevelDescriptor groupByLevelDescriptor : ((GroupByLevelNode)handle).getGroupByLevelDescriptors()) {
                HashSet<Expression> descriptorExpressions = new HashSet<Expression>();
                for (String childColumn : childrenOutputColumns) {
                    if (this.isAggColumnMatchExpression(childColumn, groupByLevelDescriptor.getOutputExpression())) {
                        descriptorExpressions.add(groupByLevelDescriptor.getOutputExpression());
                        continue;
                    }
                    for (Expression exp : groupByLevelDescriptor.getInputExpressions()) {
                        if (!this.isAggColumnMatchExpression(childColumn, exp)) continue;
                        descriptorExpressions.add(exp);
                    }
                }
                if (descriptorExpressions.size() == 0) continue;
                GroupByLevelDescriptor descriptor = groupByLevelDescriptor.deepClone();
                descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE);
                descriptor.setInputExpressions(new ArrayList<Expression>(descriptorExpressions));
                descriptorList.add(descriptor);
                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(descriptor, this.analysis.getTypeProvider());
            }
            ((GroupByLevelNode)handle).setGroupByLevelDescriptors(descriptorList);
        }
    }

    private boolean isAggColumnMatchExpression(String columnName, Expression expression) {
        if (columnName == null) {
            return false;
        }
        return columnName.contains(expression.getExpressionString());
    }

    private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) {
        List<SeriesAggregationSourceNode> rawSources = this.findAggregationSourceNode(root);
        ArrayList<SeriesAggregationSourceNode> sources = new ArrayList<SeriesAggregationSourceNode>();
        HashMap<PartialPath, Integer> regionCountPerSeries = new HashMap<PartialPath, Integer>();
        for (SeriesAggregationSourceNode child : rawSources) {
            List<TRegionReplicaSet> dataDistribution = this.analysis.getPartitionInfo(child.getPartitionPath(), child.getPartitionTimeFilter());
            for (TRegionReplicaSet dataRegion : dataDistribution) {
                SeriesAggregationSourceNode split = (SeriesAggregationSourceNode)child.clone();
                split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                split.setRegionReplicaSet(dataRegion);
                split.setAggregationDescriptorList(child.getAggregationDescriptorList().stream().map(AggregationDescriptor::deepClone).collect(Collectors.toList()));
                sources.add(split);
            }
            regionCountPerSeries.put(child.getPartitionPath(), dataDistribution.size());
        }
        for (SeriesAggregationSourceNode source : sources) {
            boolean isFinal = false;
            source.getAggregationDescriptorList().forEach(d -> {
                d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL);
                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(d, this.analysis.getTypeProvider());
            });
        }
        return sources;
    }

    private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
        if (node == null) {
            return new ArrayList<SeriesAggregationSourceNode>();
        }
        if (node instanceof SeriesAggregationSourceNode) {
            return Collections.singletonList((SeriesAggregationSourceNode)node);
        }
        ArrayList<SeriesAggregationSourceNode> ret = new ArrayList<SeriesAggregationSourceNode>();
        node.getChildren().forEach(child -> ret.addAll(this.findAggregationSourceNode((PlanNode)child)));
        return ret;
    }

    public PlanNode visit(PlanNode node, DistributionPlanContext context) {
        return node.accept(this, context);
    }

    private static class DeviceViewSplit {
        protected String device;
        protected PlanNode root;
        protected Set<TRegionReplicaSet> dataPartitions;

        protected DeviceViewSplit(String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
            this.device = device;
            this.root = root;
            this.dataPartitions = new HashSet<TRegionReplicaSet>();
            this.dataPartitions.addAll(dataPartitions);
        }

        protected PlanNode buildPlanNodeInRegion(TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
            return this.buildPlanNodeInRegion(this.root, regionReplicaSet, context);
        }

        protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
            return this.dataPartitions.contains(regionReplicaSet);
        }

        private PlanNode buildPlanNodeInRegion(PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
            List<PlanNode> children = root.getChildren().stream().map(child -> this.buildPlanNodeInRegion((PlanNode)child, regionReplicaSet, context)).collect(Collectors.toList());
            PlanNode newRoot = root.cloneWithChildren(children);
            newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
            if (newRoot instanceof SourceNode) {
                ((SourceNode)newRoot).setRegionReplicaSet(regionReplicaSet);
            }
            return newRoot;
        }
    }
}

