/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.assignment.segment;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RealtimeSegmentAssignment
implements SegmentAssignment {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentAssignment.class);
    private String _realtimeTableName;
    private int _replication;

    @Override
    public void init(HelixManager helixManager, TableConfig tableConfig) {
        this._realtimeTableName = tableConfig.getTableName();
        this._replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
        LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} for table: {}", (Object)this._replication, (Object)this._realtimeTableName);
    }

    @Override
    public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
        Preconditions.checkState((instancePartitions != null ? 1 : 0) != 0, (String)"Failed to find CONSUMING instance partitions for table: %s", (Object)this._realtimeTableName);
        Preconditions.checkState((instancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition", (Object)instancePartitions.getInstancePartitionsName());
        LOGGER.info("Assigning segment: {} with instance partitions: {} for table: {}", new Object[]{segmentName, instancePartitions, this._realtimeTableName});
        List<String> instancesAssigned = this.assignSegment(segmentName, instancePartitions);
        LOGGER.info("Assigned segment: {} to instances: {} for table: {}", new Object[]{segmentName, instancesAssigned, this._realtimeTableName});
        return instancesAssigned;
    }

    private List<String> assignSegment(String segmentName, InstancePartitions instancePartitions) {
        int partitionId = new LLCSegmentName(segmentName).getPartitionId();
        if (instancePartitions.getNumReplicaGroups() == 1) {
            List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, this._replication);
            int numInstances = instances.size();
            ArrayList<String> instancesAssigned = new ArrayList<String>(this._replication);
            for (int replicaId = 0; replicaId < this._replication; ++replicaId) {
                instancesAssigned.add(instances.get((partitionId * this._replication + replicaId) % numInstances));
            }
            return instancesAssigned;
        }
        int numReplicaGroups = instancePartitions.getNumReplicaGroups();
        if (numReplicaGroups != this._replication) {
            LOGGER.warn("Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", new Object[]{instancePartitions.getInstancePartitionsName(), numReplicaGroups, this._replication, this._realtimeTableName, numReplicaGroups});
        }
        ArrayList<String> instancesAssigned = new ArrayList<String>(numReplicaGroups);
        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; ++replicaGroupId) {
            List instances = instancePartitions.getInstances(0, replicaGroupId);
            instancesAssigned.add((String)instances.get(partitionId % instances.size()));
        }
        return instancesAssigned;
    }

    @Override
    public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
        Map<Object, Object> newAssignment;
        InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
        InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
        Preconditions.checkState((consumingInstancePartitions != null ? 1 : 0) != 0, (String)"Failed to find COMPLETED or CONSUMING instance partitions for table: %s", (Object)this._realtimeTableName);
        Preconditions.checkState((consumingInstancePartitions.getNumPartitions() == 1 ? 1 : 0) != 0, (String)"Instance partitions: %s should contain 1 partition", (Object)consumingInstancePartitions.getInstancePartitionsName());
        boolean includeConsuming = config.getBoolean("includeConsuming", false);
        LOGGER.info("Rebalancing table: {} with COMPLETED instance partitions: {}, CONSUMING instance partitions: {}, includeConsuming: {}", new Object[]{this._realtimeTableName, completedInstancePartitions, consumingInstancePartitions, includeConsuming});
        SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
        Map<String, Map<String, String>> completedSegmentAssignment = completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
        if (completedInstancePartitions != null) {
            LOGGER.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}", (Object)this._realtimeTableName);
            if (completedInstancePartitions.getNumReplicaGroups() == 1) {
                List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(completedInstancePartitions, this._replication);
                newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, instances, this._replication);
            } else {
                int numReplicaGroups = completedInstancePartitions.getNumReplicaGroups();
                if (numReplicaGroups != this._replication) {
                    LOGGER.warn("Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", new Object[]{completedInstancePartitions.getInstancePartitionsName(), numReplicaGroups, this._replication, this._realtimeTableName, numReplicaGroups});
                }
                HashMap<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<Integer, Set<String>>();
                for (String segmentName : completedSegmentAssignment.keySet()) {
                    int partitionId = new LLCSegmentName(segmentName).getPartitionId();
                    partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet()).add(segmentName);
                }
                newAssignment = SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(completedSegmentAssignment, completedInstancePartitions, partitionIdToSegmentsMap);
            }
        } else {
            LOGGER.info("No COMPLETED instance partitions found, reassigning COMPLETED segments the same way as CONSUMING segments with CONSUMING instance partitions for table: {}", (Object)this._realtimeTableName);
            newAssignment = new TreeMap();
            for (String segmentName : completedSegmentAssignment.keySet()) {
                List<String> instancesAssigned = this.assignSegment(segmentName, consumingInstancePartitions);
                Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "ONLINE");
                newAssignment.put(segmentName, instanceStateMap);
            }
        }
        Map<String, Map<String, String>> consumingSegmentAssignment = completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
        if (includeConsuming) {
            LOGGER.info("Reassigning CONSUMING segments with CONSUMING instance partitions for table: {}", (Object)this._realtimeTableName);
            for (String segmentName : consumingSegmentAssignment.keySet()) {
                List<String> instancesAssigned = this.assignSegment(segmentName, consumingInstancePartitions);
                Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "CONSUMING");
                newAssignment.put(segmentName, instanceStateMap);
            }
        } else {
            newAssignment.putAll(consumingSegmentAssignment);
        }
        newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
        LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", (Object)this._realtimeTableName, SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment));
        return newAssignment;
    }
}

