/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.assignment.segment;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.utils.Pairs;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OfflineSegmentAssignment
implements SegmentAssignment {
    private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentAssignment.class);
    private HelixManager _helixManager;
    private String _offlineTableName;
    private int _replication;
    private String _partitionColumn;

    @Override
    public void init(HelixManager helixManager, TableConfig tableConfig) {
        this._helixManager = helixManager;
        this._offlineTableName = tableConfig.getTableName();
        this._replication = tableConfig.getValidationConfig().getReplicationNumber();
        ReplicaGroupStrategyConfig replicaGroupStrategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
        String string = this._partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
        if (this._partitionColumn == null) {
            LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} without partition column for table: {} ", (Object)this._replication, (Object)this._offlineTableName);
        } else {
            LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} and partition column: {} for table: {}", new Object[]{this._replication, this._partitionColumn, this._offlineTableName});
        }
    }

    @Override
    public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        ArrayList<String> instancesAssigned;
        InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
        Preconditions.checkState((instancePartitions != null ? 1 : 0) != 0, (String)"Failed to find OFFLINE instance partitions for table: %s", (Object)this._offlineTableName);
        LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", new Object[]{segmentName, instancePartitions, this._offlineTableName});
        if (instancePartitions.getNumReplicaGroups() == 1) {
            List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, this._replication);
            int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances);
            int numInstances = numSegmentsAssignedPerInstance.length;
            PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<Pairs.IntPair>(numInstances, Pairs.intPairComparator());
            for (int instanceId = 0; instanceId < numInstances; ++instanceId) {
                heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId));
            }
            instancesAssigned = new ArrayList<String>(this._replication);
            for (int i = 0; i < this._replication; ++i) {
                instancesAssigned.add(instances.get(((Pairs.IntPair)heap.remove()).getRight()));
            }
        } else {
            int partitionId;
            int numReplicaGroups = instancePartitions.getNumReplicaGroups();
            if (numReplicaGroups != this._replication) {
                LOGGER.warn("Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", new Object[]{instancePartitions.getInstancePartitionsName(), numReplicaGroups, this._replication, this._offlineTableName, numReplicaGroups});
            }
            if (this._partitionColumn == null) {
                LOGGER.info("Assigning segment: {} without partition column for table: {}", (Object)segmentName, (Object)this._offlineTableName);
                Preconditions.checkState((instancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition without partition column", (Object)instancePartitions.getInstancePartitionsName());
                partitionId = 0;
            } else {
                LOGGER.info("Assigning segment: {} with partition column: {} for table: {}", new Object[]{segmentName, this._partitionColumn, this._offlineTableName});
                OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getOfflineSegmentZKMetadata((ZkHelixPropertyStore)this._helixManager.getHelixPropertyStore(), (String)this._offlineTableName, (String)segmentName);
                Preconditions.checkState((segmentZKMetadata != null ? 1 : 0) != 0, (String)"Failed to find segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)this._offlineTableName);
                int segmentPartitionId = this.getPartitionId(segmentZKMetadata);
                int numPartitions = instancePartitions.getNumPartitions();
                partitionId = segmentPartitionId % numPartitions;
                LOGGER.info("Assigning segment: {} with partition id: {} to partition: {}/{} for table: {}", new Object[]{segmentName, segmentPartitionId, partitionId, numPartitions, this._offlineTableName});
            }
            List instances = instancePartitions.getInstances(partitionId, 0);
            int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances);
            int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0];
            int instanceIdWithLeastSegmentsAssigned = 0;
            int numInstances = numSegmentsAssignedPerInstance.length;
            for (int instanceId = 1; instanceId < numInstances; ++instanceId) {
                if (numSegmentsAssignedPerInstance[instanceId] >= minNumSegmentsAssigned) continue;
                minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId];
                instanceIdWithLeastSegmentsAssigned = instanceId;
            }
            instancesAssigned = new ArrayList(numReplicaGroups);
            for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; ++replicaGroupId) {
                instancesAssigned.add((String)instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
            }
        }
        LOGGER.info("Assigned segment: {} to instances: {} for table: {}", new Object[]{segmentName, instancesAssigned, this._offlineTableName});
        return instancesAssigned;
    }

    @Override
    public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
        Map<String, Map<String, String>> newAssignment;
        InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
        Preconditions.checkState((instancePartitions != null ? 1 : 0) != 0, (String)"Failed to find OFFLINE instance partitions for table: %s", (Object)this._offlineTableName);
        LOGGER.info("Rebalancing table: {} with instance partitions: {}", (Object)this._offlineTableName, (Object)instancePartitions);
        if (instancePartitions.getNumReplicaGroups() == 1) {
            List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, this._replication);
            newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, this._replication);
        } else {
            int numReplicaGroups = instancePartitions.getNumReplicaGroups();
            if (numReplicaGroups != this._replication) {
                LOGGER.warn("Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", new Object[]{instancePartitions.getInstancePartitionsName(), numReplicaGroups, this._replication, this._offlineTableName, numReplicaGroups});
            }
            if (this._partitionColumn == null) {
                LOGGER.info("Rebalancing table: {} without partition column", (Object)this._offlineTableName);
                Preconditions.checkState((instancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition without partition column", (Object)instancePartitions.getInstancePartitionsName());
                newAssignment = new TreeMap<String, Map<String, String>>();
                SegmentAssignmentUtils.rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(), newAssignment);
            } else {
                LOGGER.info("Rebalancing table: {} with partition column: {}", (Object)this._offlineTableName, (Object)this._partitionColumn);
                newAssignment = this.rebalanceTableWithPartition(currentAssignment, instancePartitions);
            }
        }
        LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", (Object)this._offlineTableName, SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment));
        return newAssignment;
    }

    private Map<String, Map<String, String>> rebalanceTableWithPartition(Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) {
        List segmentZKMetadataList = ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable((ZkHelixPropertyStore)this._helixManager.getHelixPropertyStore(), (String)this._offlineTableName);
        HashMap<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new HashMap<String, OfflineSegmentZKMetadata>();
        for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
            segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
        }
        HashMap<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<Integer, Set<String>>();
        for (String segmentName : currentAssignment.keySet()) {
            int partitionId = this.getPartitionId((OfflineSegmentZKMetadata)segmentZKMetadataMap.get(segmentName));
            partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet()).add(segmentName);
        }
        return SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap);
    }

    private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) {
        String segmentName = segmentZKMetadata.getSegmentName();
        ColumnPartitionMetadata partitionMetadata = (ColumnPartitionMetadata)segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(this._partitionColumn);
        Preconditions.checkState((partitionMetadata != null ? 1 : 0) != 0, (String)"Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s", (Object)segmentName, (Object)this._offlineTableName, (Object)this._partitionColumn);
        Set partitions = partitionMetadata.getPartitions();
        Preconditions.checkState((partitions.size() == 1 ? 1 : 0) != 0, (String)"Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s", (Object)segmentName, (Object)this._offlineTableName, (Object)this._partitionColumn);
        return (Integer)partitions.iterator().next();
    }
}

