package org.apache.pinot.controller.helix.core.rebalance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.class */
public class TableRebalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TableRebalancer.class);
    private static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1000;
    private static final long EXTERNAL_VIEW_STABILIZATION_MAX_WAIT_MS = 1200000;
    private final HelixManager _helixManager;
    private final HelixDataAccessor _helixDataAccessor;

    public TableRebalancer(HelixManager helixManager) {
        this._helixManager = helixManager;
        this._helixDataAccessor = helixManager.getHelixDataAccessor();
    }

    public RebalanceResult rebalance(TableConfig tableConfig, Configuration configuration) {
        int i;
        long currentTimeMillis = System.currentTimeMillis();
        String tableName = tableConfig.getTableName();
        try {
            if (tableConfig.getTableType() == CommonConstants.Helix.TableType.REALTIME && new StreamConfig(tableName, tableConfig.getIndexingConfig().getStreamConfigs()).hasHighLevelConsumerType()) {
                return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance table with high-level consumer", null, null);
            }
            IdealState idealState = (IdealState) this._helixDataAccessor.getProperty(this._helixDataAccessor.keyBuilder().idealStates(tableName));
            if (idealState == null) {
                return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find the table", null, null);
            }
            if (!idealState.isEnabled()) {
                return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance disabled table", null, null);
            }
            ExternalView property = this._helixDataAccessor.getProperty(this._helixDataAccessor.keyBuilder().externalView(tableName));
            if (property != null && hasSegmentInErrorState(property.getRecord().getMapFields())) {
                LOGGER.warn("Found segments in ERROR state for table: {}", tableName);
                return new RebalanceResult(RebalanceResult.Status.FAILED, "Found segments in ERROR state", null, null);
            }
            boolean z = configuration.getBoolean(RebalanceConfigConstants.DRY_RUN, false);
            if (z) {
                LOGGER.info("Start rebalancing table: {} in dry-run mode", tableName);
            } else {
                LOGGER.info("Start rebalancing table: {}", tableName);
            }
            TreeMap treeMap = new TreeMap();
            boolean z2 = configuration.getBoolean(RebalanceConfigConstants.REASSIGN_INSTANCES, false);
            if (tableConfig.getTableType() == CommonConstants.Helix.TableType.OFFLINE) {
                treeMap.put(InstancePartitionsType.OFFLINE, getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, z2, z));
            } else {
                treeMap.put(InstancePartitionsType.CONSUMING, getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, z2, z));
                treeMap.put(InstancePartitionsType.COMPLETED, getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, z2, z));
            }
            LOGGER.info("Calculating the target assignment for table: {}", tableName);
            SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
            Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
            Map<String, Map<String, String>> rebalanceTable = segmentAssignment.rebalanceTable(mapFields, treeMap, configuration);
            if (mapFields.equals(rebalanceTable)) {
                LOGGER.info("Table: {} is already balanced", tableName);
                return z2 ? new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", treeMap, rebalanceTable) : new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", treeMap, rebalanceTable);
            }
            if (z) {
                LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableName);
                return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", treeMap, rebalanceTable);
            }
            if (configuration.getBoolean(RebalanceConfigConstants.DOWNTIME, false)) {
                i = 0;
                LOGGER.info("Rebalancing table: {} with downtime", tableName);
            } else {
                i = configuration.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, 1);
                int size = mapFields.values().iterator().next().size();
                int size2 = rebalanceTable.values().iterator().next().size();
                int min = Math.min(size, size2);
                if (i > 0 && i >= min) {
                    LOGGER.warn("Illegal config for min available replicas: {} for table: {}, must be less than number of replicas (current: {}, target: {})", new Object[]{Integer.valueOf(i), tableName, Integer.valueOf(size), Integer.valueOf(size2)});
                    return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config", treeMap, rebalanceTable);
                }
                if (i < 0) {
                    i = Math.max(min + i, 0);
                }
                LOGGER.info("Rebalancing table: {} with min available replicas: {}", tableName, Integer.valueOf(i));
            }
            int version = idealState.getRecord().getVersion();
            while (true) {
                try {
                    IdealState waitForExternalViewToConverge = waitForExternalViewToConverge(tableName);
                    LOGGER.info("ExternalView converged for table: {}", tableName);
                    if (waitForExternalViewToConverge.getRecord().getVersion() != version) {
                        LOGGER.info("IdealState version changed while waiting for ExternalView to converge for table: {}, re-calculating the target assignment", tableName);
                        idealState = waitForExternalViewToConverge;
                        mapFields = idealState.getRecord().getMapFields();
                        rebalanceTable = segmentAssignment.rebalanceTable(mapFields, treeMap, configuration);
                        version = idealState.getRecord().getVersion();
                    }
                    if (mapFields.equals(rebalanceTable)) {
                        LOGGER.info("Finished rebalancing table: {} in {}ms.", tableName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        return new RebalanceResult(RebalanceResult.Status.DONE, "Success", treeMap, rebalanceTable);
                    }
                    Map<String, Map<String, String>> nextAssignment = getNextAssignment(mapFields, rebalanceTable, i);
                    LOGGER.info("Got the next assignment for table: {} with number of segments to be moved to each instance: {}", tableName, SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(mapFields, nextAssignment));
                    ZNRecord record = idealState.getRecord();
                    record.setMapFields(nextAssignment);
                    idealState.setNumPartitions(nextAssignment.size());
                    idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));
                    try {
                        Preconditions.checkState(this._helixDataAccessor.getBaseDataAccessor().set(this._helixDataAccessor.keyBuilder().idealStates(tableName).getPath(), record, version, AccessOption.PERSISTENT), "Failed to update IdealState");
                        mapFields = nextAssignment;
                        version++;
                        LOGGER.info("Successfully updated the IdealState for table: {}", tableName);
                    } catch (Exception e) {
                        LOGGER.error("Caught exception while updating IdealState for table: {}", tableName, e);
                        return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, treeMap, rebalanceTable);
                    } catch (ZkBadVersionException e2) {
                        LOGGER.info("Version changed while updating IdealState for table: {}", tableName);
                    }
                } catch (Exception e3) {
                    LOGGER.error("Caught exception while waiting for ExternalView to converge for table: {}", tableName, e3);
                    return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while waiting for ExternalView to converge: " + e3, treeMap, rebalanceTable);
                }
            }
        } catch (Exception e4) {
            LOGGER.warn("Caught exception while validating table config for table: {}", tableName, e4);
            return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while validating table config: " + e4, null, null);
        }
    }

    private InstancePartitions getInstancePartitions(TableConfig tableConfig, InstancePartitionsType instancePartitionsType, boolean z, boolean z2) {
        String tableName = tableConfig.getTableName();
        if (!z) {
            return InstancePartitionsUtils.fetchOrComputeInstancePartitions(this._helixManager, tableConfig, instancePartitionsType);
        }
        if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
            LOGGER.info("Reassigning {} instance partitions for table: {}", instancePartitionsType, tableName);
            InstancePartitions assignInstances = new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, this._helixDataAccessor.getChildValues(this._helixDataAccessor.keyBuilder().instanceConfigs()));
            if (!z2) {
                LOGGER.info("Persisting instance partitions: {}", assignInstances);
                InstancePartitionsUtils.persistInstancePartitions(this._helixManager.getHelixPropertyStore(), assignInstances);
            }
            return assignInstances;
        }
        InstancePartitions computeDefaultInstancePartitions = InstancePartitionsUtils.computeDefaultInstancePartitions(this._helixManager, tableConfig, instancePartitionsType);
        if (!z2) {
            String instancePartitionsName = computeDefaultInstancePartitions.getInstancePartitionsName();
            LOGGER.info("Removing instance partitions: {}", instancePartitionsName);
            InstancePartitionsUtils.removeInstancePartitions(this._helixManager.getHelixPropertyStore(), instancePartitionsName);
        }
        return computeDefaultInstancePartitions;
    }

    private IdealState waitForExternalViewToConverge(String str) throws InterruptedException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis() + EXTERNAL_VIEW_STABILIZATION_MAX_WAIT_MS;
        while (System.currentTimeMillis() < currentTimeMillis) {
            IdealState property = this._helixDataAccessor.getProperty(this._helixDataAccessor.keyBuilder().idealStates(str));
            Preconditions.checkState(property != null, "Failed to find the IdealState");
            ExternalView property2 = this._helixDataAccessor.getProperty(this._helixDataAccessor.keyBuilder().externalView(str));
            if (property2 != null) {
                Map mapFields = property2.getRecord().getMapFields();
                if (isExternalViewConverged(mapFields, property.getRecord().getMapFields())) {
                    return property;
                }
                if (hasSegmentInErrorState(mapFields)) {
                    throw new IllegalStateException("Found segments in ERROR state");
                }
            }
            Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
        }
        throw new TimeoutException("Timeout while waiting for ExternalView to converge");
    }

    private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2, int i) {
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            String key = entry.getKey();
            treeMap.put(key, getNextInstanceStateMap(entry.getValue(), map2.get(key), i));
        }
        return treeMap;
    }

    @VisibleForTesting
    static Map<String, String> getNextInstanceStateMap(Map<String, String> map, Map<String, String> map2, int i) {
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            String key = entry.getKey();
            if (map.containsKey(key)) {
                treeMap.put(key, entry.getValue());
            }
        }
        int size = i - treeMap.size();
        if (size > 0) {
            for (Map.Entry<String, String> entry2 : map.entrySet()) {
                String key2 = entry2.getKey();
                if (!treeMap.containsKey(key2)) {
                    treeMap.put(key2, entry2.getValue());
                    size--;
                    if (size == 0) {
                        break;
                    }
                }
            }
        }
        int size2 = map2.size() - treeMap.size();
        if (size2 > 0) {
            for (Map.Entry<String, String> entry3 : map2.entrySet()) {
                String key3 = entry3.getKey();
                if (!treeMap.containsKey(key3)) {
                    treeMap.put(key3, entry3.getValue());
                    size2--;
                    if (size2 == 0) {
                        break;
                    }
                }
            }
        }
        return treeMap;
    }

    private static boolean hasSegmentInErrorState(Map<String, Map<String, String>> map) {
        Iterator<Map<String, String>> it = map.values().iterator();
        while (it.hasNext()) {
            if (it.next().containsValue(SegmentStatusChecker.ERROR)) {
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    static boolean isExternalViewConverged(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        for (Map.Entry<String, Map<String, String>> entry : map2.entrySet()) {
            Map<String, String> map3 = map.get(entry.getKey());
            if (map3 == null) {
                return false;
            }
            for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
                String value = entry2.getValue();
                if (!value.equals("OFFLINE") && !value.equals(map3.get(entry2.getKey()))) {
                    return false;
                }
            }
        }
        return true;
    }
}
