/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.assignment.segment;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RealtimeSegmentAssignment
implements SegmentAssignment {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentAssignment.class);
    private HelixManager _helixManager;
    private String _realtimeTableName;
    private int _replication;
    private String _partitionColumn;

    @Override
    public void init(HelixManager helixManager, TableConfig tableConfig) {
        this._helixManager = helixManager;
        this._realtimeTableName = tableConfig.getTableName();
        this._replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
        ReplicaGroupStrategyConfig replicaGroupStrategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
        this._partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
        LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {}, partitionColumn: {} for table: {}", new Object[]{this._replication, this._partitionColumn, this._realtimeTableName});
    }

    @Override
    public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
        Preconditions.checkState((instancePartitions != null ? 1 : 0) != 0, (String)"Failed to find CONSUMING instance partitions for table: %s", (Object)this._realtimeTableName);
        Preconditions.checkState((instancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition", (Object)instancePartitions.getInstancePartitionsName());
        LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", new Object[]{segmentName, instancePartitions, this._realtimeTableName});
        this.checkReplication(instancePartitions);
        List<String> instancesAssigned = this.assignConsumingSegment(segmentName, instancePartitions);
        LOGGER.info("Assigned segment: {} to instances: {} for table: {}", new Object[]{segmentName, instancesAssigned, this._realtimeTableName});
        return instancesAssigned;
    }

    private void checkReplication(InstancePartitions instancePartitions) {
        int numReplicaGroups = instancePartitions.getNumReplicaGroups();
        if (numReplicaGroups != 1 && numReplicaGroups != this._replication) {
            LOGGER.warn("Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", new Object[]{instancePartitions.getInstancePartitionsName(), numReplicaGroups, this._replication, this._realtimeTableName, numReplicaGroups});
        }
    }

    private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) {
        int partitionGroupId = SegmentUtils.getRealtimeSegmentPartitionId((String)segmentName, (String)this._realtimeTableName, (HelixManager)this._helixManager, (String)this._partitionColumn);
        int numReplicaGroups = instancePartitions.getNumReplicaGroups();
        if (numReplicaGroups == 1) {
            List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, this._replication);
            int numInstances = instances.size();
            ArrayList<String> instancesAssigned = new ArrayList<String>(this._replication);
            for (int replicaId = 0; replicaId < this._replication; ++replicaId) {
                instancesAssigned.add(instances.get((partitionGroupId * this._replication + replicaId) % numInstances));
            }
            return instancesAssigned;
        }
        ArrayList<String> instancesAssigned = new ArrayList<String>(numReplicaGroups);
        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; ++replicaGroupId) {
            List instances = instancePartitions.getInstances(0, replicaGroupId);
            instancesAssigned.add((String)instances.get(partitionGroupId % instances.size()));
        }
        return instancesAssigned;
    }

    @Override
    public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers, @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
        Map<Object, Object> newAssignment;
        InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
        InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
        Preconditions.checkState((consumingInstancePartitions != null ? 1 : 0) != 0, (String)"Failed to find COMPLETED or CONSUMING instance partitions for table: %s", (Object)this._realtimeTableName);
        Preconditions.checkState((consumingInstancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition", (Object)consumingInstancePartitions.getInstancePartitionsName());
        boolean includeConsuming = config.getBoolean("includeConsuming", false);
        boolean bootstrap = config.getBoolean("bootstrap", false);
        Map<String, Map<String, String>> nonTierAssignment = currentAssignment;
        ArrayList<Map<String, Map<String, String>>> newTierAssignments = null;
        if (sortedTiers != null) {
            Preconditions.checkState((tierInstancePartitionsMap != null ? 1 : 0) != 0, (Object)"Tier to instancePartitions map is null");
            LOGGER.info("Rebalancing tiers: {} for table: {} with bootstrap: {}", new Object[]{tierInstancePartitionsMap.keySet(), this._realtimeTableName, bootstrap});
            SegmentAssignmentUtils.TierSegmentAssignment tierSegmentAssignment = new SegmentAssignmentUtils.TierSegmentAssignment(this._realtimeTableName, sortedTiers, currentAssignment);
            Map<String, Map<String, Map<String, String>>> tierNameToSegmentAssignmentMap = tierSegmentAssignment.getTierNameToSegmentAssignmentMap();
            newTierAssignments = new ArrayList<Map<String, Map<String, String>>>(tierNameToSegmentAssignmentMap.size());
            for (Map.Entry<String, Map<String, Map<String, String>>> entry : tierNameToSegmentAssignmentMap.entrySet()) {
                String tierName = entry.getKey();
                Map<String, Map<String, String>> tierCurrentAssignment = entry.getValue();
                InstancePartitions tierInstancePartitions = tierInstancePartitionsMap.get(tierName);
                Preconditions.checkNotNull((Object)tierInstancePartitions, (String)"Failed to find instance partitions for tier: %s of table: %s", (Object)tierName, (Object)this._realtimeTableName);
                this.checkReplication(tierInstancePartitions);
                LOGGER.info("Rebalancing tier: {} for table: {} with bootstrap: {}, instance partitions: {}", new Object[]{tierName, this._realtimeTableName, bootstrap, tierInstancePartitions});
                newTierAssignments.add(this.reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap));
            }
            nonTierAssignment = tierSegmentAssignment.getNonTierSegmentAssignment();
        }
        LOGGER.info("Rebalancing table: {} with COMPLETED instance partitions: {}, CONSUMING instance partitions: {}, includeConsuming: {}, bootstrap: {}", new Object[]{this._realtimeTableName, completedInstancePartitions, consumingInstancePartitions, includeConsuming, bootstrap});
        if (completedInstancePartitions != null) {
            this.checkReplication(completedInstancePartitions);
        }
        this.checkReplication(consumingInstancePartitions);
        SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment);
        Map<String, Map<String, String>> completedSegmentAssignment = completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
        if (completedInstancePartitions != null) {
            LOGGER.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}", (Object)this._realtimeTableName);
            newAssignment = this.reassignSegments(InstancePartitionsType.COMPLETED.toString(), completedSegmentAssignment, completedInstancePartitions, bootstrap);
        } else {
            LOGGER.info("No COMPLETED instance partitions found, reassigning COMPLETED segments the same way as CONSUMING segments with CONSUMING instance partitions for table: {}", (Object)this._realtimeTableName);
            newAssignment = new TreeMap();
            for (String segmentName : completedSegmentAssignment.keySet()) {
                List<String> instancesAssigned = this.assignConsumingSegment(segmentName, consumingInstancePartitions);
                Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "ONLINE");
                newAssignment.put(segmentName, instanceStateMap);
            }
        }
        Map<String, Map<String, String>> map = completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
        if (includeConsuming) {
            LOGGER.info("Reassigning CONSUMING segments with CONSUMING instance partitions for table: {}", (Object)this._realtimeTableName);
            for (String segmentName : map.keySet()) {
                List<String> instancesAssigned = this.assignConsumingSegment(segmentName, consumingInstancePartitions);
                Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "CONSUMING");
                newAssignment.put(segmentName, instanceStateMap);
            }
        } else {
            newAssignment.putAll(map);
        }
        newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
        if (CollectionUtils.isNotEmpty(newTierAssignments)) {
            newTierAssignments.forEach(newAssignment::putAll);
        }
        LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", (Object)this._realtimeTableName, SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment));
        return newAssignment;
    }

    private Map<String, Map<String, String>> reassignSegments(String instancePartitionType, Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, boolean bootstrap) {
        Map<Object, Object> newAssignment;
        if (bootstrap) {
            LOGGER.info("Bootstrapping segment assignment for {} segments of table: {}", (Object)instancePartitionType, (Object)this._realtimeTableName);
            newAssignment = new TreeMap();
            for (String segment : currentAssignment.keySet()) {
                List<String> assignedInstances = this.assignCompletedSegment(segment, newAssignment, instancePartitions);
                newAssignment.put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, "ONLINE"));
            }
        } else if (instancePartitions.getNumReplicaGroups() == 1) {
            List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, this._replication);
            newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, this._replication);
        } else {
            HashMap<Integer, List<String>> partitionGroupIdToSegmentsMap = new HashMap<Integer, List<String>>();
            for (String segmentName : currentAssignment.keySet()) {
                int partitionGroupId = SegmentUtils.getRealtimeSegmentPartitionId((String)segmentName, (String)this._realtimeTableName, (HelixManager)this._helixManager, (String)this._partitionColumn);
                partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> new ArrayList()).add(segmentName);
            }
            Random random = new Random(this._realtimeTableName.hashCode());
            for (List segments : partitionGroupIdToSegmentsMap.values()) {
                Collections.shuffle(segments, random);
            }
            newAssignment = SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap);
        }
        return newAssignment;
    }

    private List<String> assignCompletedSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) {
        int numReplicaGroups = instancePartitions.getNumReplicaGroups();
        if (numReplicaGroups == 1) {
            return SegmentAssignmentUtils.assignSegmentWithoutReplicaGroup(currentAssignment, instancePartitions, this._replication);
        }
        int segmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId((String)segmentName, (String)this._realtimeTableName, (HelixManager)this._helixManager, (String)this._partitionColumn);
        int numPartitions = instancePartitions.getNumPartitions();
        int partitionGroupId = segmentPartitionId % numPartitions;
        return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
    }
}

