package org.apache.pinot.controller.helix.core.rebalance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.OfflineTagConfig;
import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.partition.PartitionAssignment;
import org.apache.pinot.common.partition.StreamPartitionAssignmentGenerator;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceSegmentStrategy.class */
public class DefaultRebalanceSegmentStrategy implements RebalanceSegmentStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRebalanceSegmentStrategy.class);
    private static final boolean DEFAULT_DRY_RUN = true;
    private static final boolean DEFAULT_INCLUDE_CONSUMING = false;
    private HelixManager _helixManager;
    private HelixAdmin _helixAdmin;
    private String _helixClusterName;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;

    public DefaultRebalanceSegmentStrategy(HelixManager helixManager) {
        this._helixManager = helixManager;
        this._helixAdmin = helixManager.getClusterManagmentTool();
        this._helixClusterName = helixManager.getClusterName();
        this._propertyStore = helixManager.getHelixPropertyStore();
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
    public PartitionAssignment rebalancePartitionAssignment(IdealState idealState, TableConfig tableConfig, Configuration configuration) throws InvalidConfigException {
        String tableName = tableConfig.getTableName();
        PartitionAssignment partitionAssignment = new PartitionAssignment(tableName);
        if (tableConfig.getTableType().equals(CommonConstants.Helix.TableType.REALTIME)) {
            if (!new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()).hasLowLevelConsumerType()) {
                LOGGER.info("Table {} does not have LLC and will have no partition assignment", tableName);
                return partitionAssignment;
            }
            LOGGER.info("Rebalancing stream partition assignment for table {}", tableConfig.getTableName());
            if (configuration.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, false)) {
                StreamPartitionAssignmentGenerator streamPartitionAssignmentGenerator = getStreamPartitionAssignmentGenerator();
                partitionAssignment = streamPartitionAssignmentGenerator.generateStreamPartitionAssignment(tableConfig, streamPartitionAssignmentGenerator.getNumPartitionsFromIdealState(idealState));
            } else {
                LOGGER.info("includeConsuming = false. No need to rebalance partition assignment for {}", tableConfig.getTableType());
            }
        }
        return partitionAssignment;
    }

    @VisibleForTesting
    protected StreamPartitionAssignmentGenerator getStreamPartitionAssignmentGenerator() {
        return new StreamPartitionAssignmentGenerator(this._helixManager);
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
    public IdealState getRebalancedIdealState(IdealState idealState, TableConfig tableConfig, Configuration configuration, PartitionAssignment partitionAssignment) {
        int parseInt;
        String tableName = tableConfig.getTableName();
        CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
        LOGGER.info("Rebalancing ideal state for table {}", tableName);
        if (tableType.equals(CommonConstants.Helix.TableType.REALTIME)) {
            String replicasPerPartition = tableConfig.getValidationConfig().getReplicasPerPartition();
            try {
                parseInt = Integer.parseInt(replicasPerPartition);
            } catch (Exception e) {
                throw new RuntimeException("Invalid value for replicasPerPartition:'" + replicasPerPartition + "'", e);
            }
        } else {
            parseInt = Integer.parseInt(tableConfig.getValidationConfig().getReplication());
        }
        return rebalanceIdealState(idealState, tableConfig, parseInt, configuration, partitionAssignment);
    }

    private IdealState rebalanceIdealState(IdealState idealState, TableConfig tableConfig, int i, Configuration configuration, PartitionAssignment partitionAssignment) {
        if (tableConfig.getTableType().equals(CommonConstants.Helix.TableType.REALTIME) && configuration.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, false)) {
            rebalanceConsumingSegments(idealState, partitionAssignment);
        }
        rebalanceServingSegments(idealState, tableConfig, i);
        idealState.setReplicas(Integer.toString(i));
        return idealState;
    }

    private void rebalanceConsumingSegments(IdealState idealState, PartitionAssignment partitionAssignment) {
        LOGGER.info("Rebalancing consuming segments for table {}", idealState.getResourceName());
        HashMap hashMap = new HashMap(partitionAssignment.getNumPartitions());
        for (String str : idealState.getPartitionSet()) {
            if (SegmentName.isLowLevelConsumerSegmentName(str)) {
                LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
                int partitionId = lLCSegmentName.getPartitionId();
                LLCSegmentName lLCSegmentName2 = (LLCSegmentName) hashMap.get(Integer.valueOf(partitionId));
                if (lLCSegmentName2 == null || lLCSegmentName.getSequenceNumber() > lLCSegmentName2.getSequenceNumber()) {
                    hashMap.put(Integer.valueOf(partitionId), lLCSegmentName);
                }
            }
        }
        Map partitionToInstances = partitionAssignment.getPartitionToInstances();
        for (LLCSegmentName lLCSegmentName3 : hashMap.values()) {
            int partitionId2 = lLCSegmentName3.getPartitionId();
            HashMap hashMap2 = new HashMap();
            Iterator it = ((List) partitionToInstances.get(String.valueOf(partitionId2))).iterator();
            while (it.hasNext()) {
                hashMap2.put((String) it.next(), "CONSUMING");
            }
            idealState.setInstanceStateMap(lLCSegmentName3.getSegmentName(), hashMap2);
        }
    }

    protected IdealState rebalanceServingSegments(IdealState idealState, TableConfig tableConfig, int i) {
        LOGGER.info("Rebalancing serving segments for table {}", tableConfig.getTableName());
        if (Integer.parseInt(idealState.getReplicas()) > i) {
            for (String str : idealState.getPartitionSet()) {
                Map instanceStateMap = idealState.getInstanceStateMap(str);
                if (instanceStateMap.size() > i) {
                    Set keySet = instanceStateMap.keySet();
                    while (instanceStateMap.size() > i) {
                        instanceStateMap.remove(keySet.iterator().next());
                    }
                } else if (instanceStateMap.size() < i) {
                    LOGGER.warn("Table {}, segment {} has {} replicas, less than {} (requested number of replicas)", new Object[]{idealState.getResourceName(), str, Integer.valueOf(instanceStateMap.size()), Integer.valueOf(i)});
                }
            }
        } else {
            Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            if (tableConfig.getTableType().equals(CommonConstants.Helix.TableType.REALTIME)) {
                filterSegmentsForRealtimeRebalance(mapFields, linkedHashMap);
            }
            if (!mapFields.isEmpty()) {
                String tableName = tableConfig.getTableName();
                LinkedHashMap linkedHashMap2 = new LinkedHashMap();
                ArrayList newArrayList = Lists.newArrayList(idealState.getPartitionSet());
                linkedHashMap2.put("OFFLINE", 0);
                linkedHashMap2.put("ONLINE", Integer.valueOf(i));
                HashSet hashSet = new HashSet();
                Iterator<String> it = mapFields.keySet().iterator();
                while (it.hasNext()) {
                    hashSet.addAll(mapFields.get(it.next()).keySet());
                }
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                getServingInstances(tableConfig, arrayList, arrayList2);
                AutoRebalanceStrategy autoRebalanceStrategy = new AutoRebalanceStrategy(tableName, newArrayList, linkedHashMap2);
                LOGGER.info("Current nodes for table {}: {}", tableName, hashSet);
                LOGGER.info("New nodes for table {}: {}", tableName, arrayList);
                LOGGER.info("Enabled nodes for table: {} {}", tableName, arrayList2);
                for (Map.Entry entry : autoRebalanceStrategy.computePartitionAssignment(arrayList, arrayList2, mapFields, new ClusterDataCache()).getMapFields().entrySet()) {
                    idealState.setInstanceStateMap((String) entry.getKey(), (Map) entry.getValue());
                }
            }
            for (Map.Entry<String, Map<String, String>> entry2 : linkedHashMap.entrySet()) {
                idealState.setInstanceStateMap(entry2.getKey(), entry2.getValue());
            }
        }
        return idealState;
    }

    private void filterSegmentsForRealtimeRebalance(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        Iterator<Map.Entry<String, Map<String, String>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Map<String, String>> next = it.next();
            String key = next.getKey();
            boolean z = false;
            if (SegmentName.isLowLevelConsumerSegmentName(key) && next.getValue().values().contains("ONLINE")) {
                z = true;
            }
            if (!z) {
                map2.put(key, next.getValue());
                it.remove();
            }
        }
    }

    private void getServingInstances(TableConfig tableConfig, List<String> list, List<String> list2) {
        String completedServerTag = tableConfig.getTableType().equals(CommonConstants.Helix.TableType.REALTIME) ? new RealtimeTagConfig(tableConfig).getCompletedServerTag() : new OfflineTagConfig(tableConfig).getOfflineServerTag();
        list.addAll(getInstancesWithTag(completedServerTag));
        list2.addAll(getEnabledInstancesWithTag(completedServerTag));
    }

    @VisibleForTesting
    protected List<String> getInstancesWithTag(String str) {
        return HelixHelper.getInstancesWithTag(this._helixManager, str);
    }

    @VisibleForTesting
    protected List<String> getEnabledInstancesWithTag(String str) {
        return HelixHelper.getEnabledInstancesWithTag(this._helixManager, str);
    }
}
