package org.apache.pinot.controller.helix.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.api.resources.StateType;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/PinotHelixResourceManager.class */
public class PinotHelixResourceManager {
    private static final Logger LOGGER;
    private static final long CACHE_ENTRY_EXPIRE_TIME_HOURS = 6;
    private static final RetryPolicy DEFAULT_RETRY_POLICY;
    public static final String APPEND = "APPEND";
    private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
    public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 600000;
    public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1000;
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT;
    private final Map<String, Map<String, Long>> _segmentCrcMap;
    private final Map<String, Map<String, Integer>> _lastKnownSegmentMetadataVersionMap;
    private final Object[] _tableUpdaterLocks;
    private final LoadingCache<String, String> _instanceAdminEndpointCache;
    private final String _helixZkURL;
    private final String _helixClusterName;
    private final String _dataDir;
    private final boolean _isSingleTenantCluster;
    private final boolean _enableBatchMessageMode;
    private final boolean _allowHLCTables;
    private HelixManager _helixZkManager;
    private HelixAdmin _helixAdmin;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private HelixDataAccessor _helixDataAccessor;
    private PropertyKey.Builder _keyBuilder;
    private SegmentDeletionManager _segmentDeletionManager;
    private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
    private TableCache _tableCache;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pinot.controller.helix.core.PinotHelixResourceManager$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/PinotHelixResourceManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pinot$spi$config$table$TableType;

        static {
            try {
                $SwitchMap$org$apache$pinot$controller$api$resources$StateType[StateType.ENABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pinot$controller$api$resources$StateType[StateType.DISABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pinot$controller$api$resources$StateType[StateType.DROP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$pinot$spi$config$table$TableType = new int[TableType.values().length];
            try {
                $SwitchMap$org$apache$pinot$spi$config$table$TableType[TableType.OFFLINE.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pinot$spi$config$table$TableType[TableType.REALTIME.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public PinotHelixResourceManager(String str, String str2, @Nullable String str3, boolean z, boolean z2, boolean z3) {
        this._segmentCrcMap = new HashMap();
        this._lastKnownSegmentMetadataVersionMap = new HashMap();
        this._helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(str);
        this._helixClusterName = str2;
        this._dataDir = str3;
        this._isSingleTenantCluster = z;
        this._enableBatchMessageMode = z2;
        this._allowHLCTables = z3;
        this._instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS).build(new CacheLoader<String, String>() { // from class: org.apache.pinot.controller.helix.core.PinotHelixResourceManager.1
            public String load(String str4) {
                InstanceConfig helixInstanceConfig = PinotHelixResourceManager.this.getHelixInstanceConfig(str4);
                Preconditions.checkNotNull(helixInstanceConfig, "Failed to find instance config for: %s", str4);
                String hostName = helixInstanceConfig.getHostName();
                if (hostName.startsWith("Server_")) {
                    hostName = hostName.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
                }
                Object obj = "http";
                int i = 8097;
                int intField = helixInstanceConfig.getRecord().getIntField("adminPort", -1);
                int intField2 = helixInstanceConfig.getRecord().getIntField("adminHttpsPort", -1);
                if (intField > 0) {
                    obj = "http";
                    i = intField;
                } else if (intField2 > 0) {
                    obj = "https";
                    i = intField2;
                }
                return String.format("%s://%s:%d", obj, hostName, Integer.valueOf(i));
            }
        });
        this._tableUpdaterLocks = new Object[100];
        for (int i = 0; i < this._tableUpdaterLocks.length; i++) {
            this._tableUpdaterLocks[i] = new Object();
        }
        SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
    }

    public PinotHelixResourceManager(ControllerConf controllerConf) {
        this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
    }

    public synchronized void start(HelixManager helixManager) {
        this._helixZkManager = helixManager;
        this._helixAdmin = this._helixZkManager.getClusterManagmentTool();
        this._propertyStore = this._helixZkManager.getHelixPropertyStore();
        this._helixDataAccessor = this._helixZkManager.getHelixDataAccessor();
        this._keyBuilder = this._helixDataAccessor.keyBuilder();
        this._segmentDeletionManager = new SegmentDeletionManager(this._dataDir, this._helixAdmin, this._helixClusterName, this._propertyStore);
        ZKMetadataProvider.setClusterTenantIsolationEnabled(this._propertyStore, this._isSingleTenantCluster);
        Map config = this._helixAdmin.getConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._helixClusterName).build(), Arrays.asList("enable.case.insensitive", "enable.case.insensitive.pql"));
        this._tableCache = new TableCache(this._propertyStore, Boolean.parseBoolean((String) config.get("enable.case.insensitive")) || Boolean.parseBoolean((String) config.get("enable.case.insensitive.pql")));
    }

    public synchronized void stop() {
        this._segmentDeletionManager.stop();
    }

    public String getHelixZkURL() {
        return this._helixZkURL;
    }

    public String getHelixClusterName() {
        return this._helixClusterName;
    }

    public SegmentDeletionManager getSegmentDeletionManager() {
        return this._segmentDeletionManager;
    }

    public HelixManager getHelixZkManager() {
        return this._helixZkManager;
    }

    public HelixAdmin getHelixAdmin() {
        return this._helixAdmin;
    }

    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    public List<String> getAllInstances() {
        return this._helixAdmin.getInstancesInCluster(this._helixClusterName);
    }

    public List<InstanceConfig> getAllHelixInstanceConfigs() {
        return HelixHelper.getInstanceConfigs(this._helixZkManager);
    }

    @Nullable
    public InstanceConfig getHelixInstanceConfig(String str) {
        return this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(str));
    }

    @Nullable
    public InstanceZKMetadata getInstanceZKMetadata(String str) {
        return ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, str);
    }

    public List<String> getBrokerInstancesFor(String str) {
        return (List) getBrokerInstancesConfigsFor(str).stream().map((v0) -> {
            return v0.getInstanceName();
        }).collect(Collectors.toList());
    }

    public List<InstanceConfig> getBrokerInstancesConfigsFor(String str) {
        String str2 = null;
        TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, str);
        if (offlineTableConfig != null) {
            str2 = offlineTableConfig.getTenantConfig().getBroker();
        } else {
            TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, str);
            if (realtimeTableConfig != null) {
                str2 = realtimeTableConfig.getTenantConfig().getBroker();
            }
        }
        return HelixHelper.getInstancesConfigsWithTag(HelixHelper.getInstanceConfigs(this._helixZkManager), TagNameUtils.getBrokerTagForTenant(str2));
    }

    public List<String> getInstancesWithTag(String str) {
        return HelixHelper.getInstancesWithTag(this._helixZkManager, str);
    }

    public synchronized PinotResourceManagerResponse addInstance(Instance instance) {
        List<String> allInstances = getAllInstances();
        String helixInstanceId = InstanceUtils.getHelixInstanceId(instance);
        if (allInstances.contains(helixInstanceId)) {
            return PinotResourceManagerResponse.failure("Instance " + helixInstanceId + " already exists");
        }
        this._helixAdmin.addInstance(this._helixClusterName, InstanceUtils.toHelixInstanceConfig(instance));
        return PinotResourceManagerResponse.SUCCESS;
    }

    public synchronized PinotResourceManagerResponse updateInstance(String str, Instance instance) {
        InstanceConfig helixInstanceConfig = getHelixInstanceConfig(str);
        if (helixInstanceConfig == null) {
            return PinotResourceManagerResponse.failure("Instance " + str + " does not exists");
        }
        InstanceUtils.updateHelixInstanceConfig(helixInstanceConfig, instance);
        return !this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(str), helixInstanceConfig) ? PinotResourceManagerResponse.failure("Unable to update instance: " + str) : PinotResourceManagerResponse.SUCCESS;
    }

    public synchronized PinotResourceManagerResponse updateInstanceTags(String str, String str2) {
        InstanceConfig helixInstanceConfig = getHelixInstanceConfig(str);
        if (helixInstanceConfig == null) {
            return PinotResourceManagerResponse.failure("Instance " + str + " does not exists");
        }
        helixInstanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), Arrays.asList(StringUtils.split(str2, ',')));
        return !this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(str), helixInstanceConfig) ? PinotResourceManagerResponse.failure("Unable to update instance: " + str + " to tags: " + str2) : PinotResourceManagerResponse.SUCCESS;
    }

    public boolean isInstanceOfflineFor(String str, long j) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(str)) != null) {
            return false;
        }
        long lastOfflineTime = this._helixDataAccessor.getProperty(this._keyBuilder.participantHistory(str)).getLastOfflineTime();
        if (lastOfflineTime < 0 || System.currentTimeMillis() - lastOfflineTime <= j) {
            return false;
        }
        LOGGER.info("Instance: {} has been offline for more than {}ms", str, Long.valueOf(j));
        return true;
    }

    public List<String> getAllResources() {
        return this._helixAdmin.getResourcesInCluster(this._helixClusterName);
    }

    public List<String> getAllTables() {
        ArrayList arrayList = new ArrayList();
        for (String str : getAllResources()) {
            if (TableNameBuilder.isTableResource(str)) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    public List<String> getAllOfflineTables() {
        ArrayList arrayList = new ArrayList();
        for (String str : getAllResources()) {
            if (TableNameBuilder.isOfflineTableResource(str)) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    public List<String> getAllRealtimeTables() {
        ArrayList arrayList = new ArrayList();
        for (String str : getAllResources()) {
            if (TableNameBuilder.isRealtimeTableResource(str)) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    public List<String> getAllRawTables() {
        HashSet hashSet = new HashSet();
        for (String str : getAllResources()) {
            if (TableNameBuilder.isTableResource(str)) {
                hashSet.add(TableNameBuilder.extractRawTableName(str));
            }
        }
        return new ArrayList(hashSet);
    }

    public String getActualTableName(String str) {
        String actualTableName;
        if (this._tableCache.isCaseInsensitive() && (actualTableName = this._tableCache.getActualTableName(str)) != null) {
            return actualTableName;
        }
        return str;
    }

    public String getCrypterClassNameFromTableConfig(String str) {
        TableConfig tableConfig = this._tableCache.getTableConfig(str);
        Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", str);
        return tableConfig.getValidationConfig().getCrypterClassName();
    }

    public List<String> getSegmentsFor(String str) {
        return ZKMetadataProvider.getSegments(this._propertyStore, str);
    }

    public List<String> getSegmentsForTableWithTimestamps(String str, long j, long j2, boolean z) {
        List<String> arrayList;
        if (j == Long.MIN_VALUE && j2 == Long.MAX_VALUE) {
            arrayList = getSegmentsFor(str);
        } else {
            arrayList = new ArrayList();
            for (SegmentZKMetadata segmentZKMetadata : getSegmentsZKMetadata(str)) {
                String segmentName = segmentZKMetadata.getSegmentName();
                if (isSegmentWithinTimeStamps(segmentZKMetadata, j, j2, z)) {
                    arrayList.add(segmentName);
                }
            }
        }
        SegmentLineage fromZNRecord = SegmentLineage.fromZNRecord(SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, str));
        HashSet hashSet = new HashSet(arrayList);
        SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(hashSet, fromZNRecord);
        return new ArrayList(hashSet);
    }

    private boolean isSegmentWithinTimeStamps(SegmentZKMetadata segmentZKMetadata, long j, long j2, boolean z) {
        if (segmentZKMetadata == null) {
            return false;
        }
        long startTimeMs = segmentZKMetadata.getStartTimeMs();
        long endTimeMs = segmentZKMetadata.getEndTimeMs();
        if (startTimeMs == -1 && endTimeMs == -1) {
            return true;
        }
        if (startTimeMs > endTimeMs) {
            LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", new Object[]{segmentZKMetadata.getSegmentName(), Long.valueOf(startTimeMs), Long.valueOf(endTimeMs)});
            return false;
        }
        if (j > startTimeMs || endTimeMs >= j2) {
            return endTimeMs >= j && startTimeMs < j2 && !z;
        }
        return true;
    }

    @Nullable
    public SegmentZKMetadata getSegmentZKMetadata(String str, String str2) {
        return ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, str, str2);
    }

    public List<SegmentZKMetadata> getSegmentsZKMetadata(String str) {
        return ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, str);
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String str, List<String> list) {
        try {
            LOGGER.info("Trying to delete segments: {} from table: {} ", list, str);
            Preconditions.checkArgument(TableNameBuilder.isTableResource(str), "Table name: %s is not a valid table name with type suffix", str);
            HelixHelper.removeSegmentsFromIdealState(this._helixZkManager, str, list);
            this._segmentDeletionManager.deleteSegments(str, list);
            return PinotResourceManagerResponse.success("Segment " + list + " deleted");
        } catch (Exception e) {
            LOGGER.error("Caught exception while deleting segment: {} from table: {}", new Object[]{list, str, e});
            return PinotResourceManagerResponse.failure(e.getMessage());
        }
    }

    public synchronized PinotResourceManagerResponse deleteSegment(String str, String str2) {
        return deleteSegments(str, Collections.singletonList(str2));
    }

    public PinotResourceManagerResponse updateBrokerTenant(Tenant tenant) {
        String brokerTagForTenant = TagNameUtils.getBrokerTagForTenant(tenant.getTenantName());
        List<String> instancesWithTag = HelixHelper.getInstancesWithTag(this._helixZkManager, brokerTagForTenant);
        return instancesWithTag.size() > tenant.getNumberOfInstances() ? scaleDownBroker(tenant, brokerTagForTenant, instancesWithTag) : instancesWithTag.size() < tenant.getNumberOfInstances() ? scaleUpBroker(tenant, brokerTagForTenant, instancesWithTag) : PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleUpBroker(Tenant tenant, String str, List<String> list) {
        List<String> onlineUnTaggedBrokerInstanceList = getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstances = tenant.getNumberOfInstances() - list.size();
        if (onlineUnTaggedBrokerInstanceList.size() < numberOfInstances) {
            String str2 = "Failed to allocate broker instances to Tag : " + tenant.getTenantName() + ", Current number of untagged broker instances : " + onlineUnTaggedBrokerInstanceList.size() + ", Current number of tagged broker instances : " + list.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(str2);
            return PinotResourceManagerResponse.failure(str2);
        }
        for (int i = 0; i < numberOfInstances; i++) {
            String str3 = onlineUnTaggedBrokerInstanceList.get(i);
            retagInstance(str3, "broker_untagged", str);
            addInstanceToBrokerIdealState(str, str3);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String str) throws Exception {
        try {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
            if (tableConfig != null) {
                return rebuildBrokerResource(str, getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
            }
            LOGGER.warn("Table " + str + " does not exist");
            throw new InvalidConfigException("Invalid table configuration for table " + str + ". Table does not exist");
        } catch (Exception e) {
            LOGGER.warn("Caught exception while getting table config for table {}", str, e);
            throw new InvalidTableConfigException("Failed to fetch broker tag for table " + str + " due to exception: " + e.getMessage());
        }
    }

    public PinotResourceManagerResponse rebuildBrokerResource(String str, Set<String> set) {
        if (HelixHelper.getBrokerIdealStates(this._helixAdmin, this._helixClusterName).getInstanceSet(str).equals(set)) {
            return PinotResourceManagerResponse.success("Broker resource is not rebuilt because ideal state is the same for table: " + str);
        }
        try {
            HelixHelper.updateIdealState(getHelixZkManager(), "brokerResource", idealState -> {
                if (!$assertionsDisabled && idealState == null) {
                    throw new AssertionError();
                }
                Map instanceStateMap = idealState.getInstanceStateMap(str);
                if (instanceStateMap != null) {
                    instanceStateMap.clear();
                }
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    idealState.setPartitionState(str, (String) it.next(), "ONLINE");
                }
                return idealState;
            }, DEFAULT_RETRY_POLICY);
            LOGGER.info("Successfully rebuilt brokerResource for table: {}", str);
            return PinotResourceManagerResponse.success("Rebuilt brokerResource for table: " + str);
        } catch (Exception e) {
            LOGGER.error("Caught exception while rebuilding broker resource for table: {}", str, e);
            throw e;
        }
    }

    private void addInstanceToBrokerIdealState(String str, String str2) {
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, "brokerResource");
        for (String str3 : resourceIdealState.getPartitionSet()) {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str3);
            Preconditions.checkNotNull(tableConfig);
            if (TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()).equals(str)) {
                resourceIdealState.setPartitionState(str3, str2, "ONLINE");
            }
        }
        this._helixAdmin.setResourceIdealState(this._helixClusterName, "brokerResource", resourceIdealState);
    }

    private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String str, List<String> list) {
        int size = list.size() - tenant.getNumberOfInstances();
        for (int i = 0; i < size; i++) {
            retagInstance(list.get(i), str, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void retagInstance(String str, String str2, String str3) {
        this._helixAdmin.removeInstanceTag(this._helixClusterName, str, str2);
        this._helixAdmin.addInstanceTag(this._helixClusterName, str, str3);
    }

    public PinotResourceManagerResponse updateServerTenant(Tenant tenant) {
        String realtimeTagForTenant = TagNameUtils.getRealtimeTagForTenant(tenant.getTenantName());
        List<String> instancesWithTag = HelixHelper.getInstancesWithTag(this._helixZkManager, realtimeTagForTenant);
        String offlineTagForTenant = TagNameUtils.getOfflineTagForTenant(tenant.getTenantName());
        List<String> instancesWithTag2 = HelixHelper.getInstancesWithTag(this._helixZkManager, offlineTagForTenant);
        HashSet hashSet = new HashSet();
        hashSet.addAll(instancesWithTag2);
        hashSet.addAll(instancesWithTag);
        if ((hashSet.size() < instancesWithTag2.size() + instancesWithTag.size()) == tenant.isCoLocated()) {
            return (tenant.getNumberOfInstances() < hashSet.size() || tenant.getOfflineInstances() < instancesWithTag2.size() || tenant.getRealtimeInstances() < instancesWithTag.size()) ? scaleDownServer(tenant, instancesWithTag, instancesWithTag2, hashSet) : scaleUpServerTenant(tenant, realtimeTagForTenant, instancesWithTag, offlineTagForTenant, instancesWithTag2, hashSet);
        }
        String str = "Not support different colocated type request for update request: " + tenant;
        LOGGER.error(str);
        return PinotResourceManagerResponse.failure(str);
    }

    private PinotResourceManagerResponse scaleUpServerTenant(Tenant tenant, String str, List<String> list, String str2, List<String> list2, Set<String> set) {
        int numberOfInstances = tenant.getNumberOfInstances() - set.size();
        List<String> onlineUnTaggedServerInstanceList = getOnlineUnTaggedServerInstanceList();
        if (onlineUnTaggedServerInstanceList.size() >= numberOfInstances) {
            return tenant.isCoLocated() ? updateColocatedServerTenant(tenant, str, list, str2, list2, numberOfInstances, onlineUnTaggedServerInstanceList) : updateIndependentServerTenant(tenant, str, list, str2, list2, numberOfInstances, onlineUnTaggedServerInstanceList);
        }
        String str3 = "Failed to allocate hardware resources with tenant info: " + tenant + ", Current number of untagged instances : " + onlineUnTaggedServerInstanceList.size() + ", Current number of serving instances : " + set.size() + ", Current number of tagged offline server instances : " + list2.size() + ", Current number of tagged realtime server instances : " + list.size();
        LOGGER.error(str3);
        return PinotResourceManagerResponse.failure(str3);
    }

    private PinotResourceManagerResponse updateIndependentServerTenant(Tenant tenant, String str, List<String> list, String str2, List<String> list2, int i, List<String> list3) {
        int offlineInstances = tenant.getOfflineInstances() - list2.size();
        int realtimeInstances = tenant.getRealtimeInstances() - list.size();
        for (int i2 = 0; i2 < offlineInstances; i2++) {
            retagInstance(list3.get(i2), "server_untagged", str2);
        }
        for (int i3 = offlineInstances; i3 < offlineInstances + realtimeInstances; i3++) {
            retagInstance(list3.get(i3), "server_untagged", str);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse updateColocatedServerTenant(Tenant tenant, String str, List<String> list, String str2, List<String> list2, int i, List<String> list3) {
        int offlineInstances = tenant.getOfflineInstances() - list2.size();
        int realtimeInstances = tenant.getRealtimeInstances() - list.size();
        list.removeAll(list2);
        list2.removeAll(list);
        for (int i2 = 0; i2 < offlineInstances; i2++) {
            if (i2 < i) {
                retagInstance(list3.get(i2), "server_untagged", str2);
            } else {
                this._helixAdmin.addInstanceTag(this._helixClusterName, list.get(i2 - i), str2);
            }
        }
        for (int i3 = offlineInstances; i3 < offlineInstances + realtimeInstances; i3++) {
            if (i3 < i) {
                retagInstance(list3.get(i3), "server_untagged", str);
            } else {
                this._helixAdmin.addInstanceTag(this._helixClusterName, list2.get(i3 - Math.max(i, offlineInstances)), str);
            }
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleDownServer(Tenant tenant, List<String> list, List<String> list2, Set<String> set) {
        String str = "Not support to size down the current server cluster with tenant info: " + tenant + ", Current number of serving instances : " + set.size() + ", Current number of tagged offline server instances : " + list2.size() + ", Current number of tagged realtime server instances : " + list.size();
        LOGGER.error(str);
        return PinotResourceManagerResponse.failure(str);
    }

    public boolean isBrokerTenantDeletable(String str) {
        HashSet hashSet = new HashSet(HelixHelper.getInstancesWithTag(this._helixZkManager, TagNameUtils.getBrokerTagForTenant(str)));
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, "brokerResource");
        Iterator it = resourceIdealState.getPartitionSet().iterator();
        while (it.hasNext()) {
            Iterator it2 = resourceIdealState.getInstanceSet((String) it.next()).iterator();
            while (it2.hasNext()) {
                if (hashSet.contains((String) it2.next())) {
                    return false;
                }
            }
        }
        return true;
    }

    public boolean isServerTenantDeletable(String str) {
        HashSet hashSet = new HashSet(HelixHelper.getInstancesWithTag(this._helixZkManager, TagNameUtils.getOfflineTagForTenant(str)));
        hashSet.addAll(HelixHelper.getInstancesWithTag(this._helixZkManager, TagNameUtils.getRealtimeTagForTenant(str)));
        for (String str2 : getAllResources()) {
            if (TableNameBuilder.isTableResource(str2)) {
                IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str2);
                Iterator it = resourceIdealState.getPartitionSet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = resourceIdealState.getInstanceSet((String) it.next()).iterator();
                    while (it2.hasNext()) {
                        if (hashSet.contains((String) it2.next())) {
                            return false;
                        }
                    }
                }
            }
        }
        return true;
    }

    public Set<String> getAllBrokerTenantNames() {
        HashSet hashSet = new HashSet();
        Iterator<InstanceConfig> it = getAllHelixInstanceConfigs().iterator();
        while (it.hasNext()) {
            for (String str : it.next().getTags()) {
                if (TagNameUtils.isBrokerTag(str)) {
                    hashSet.add(TagNameUtils.getTenantFromTag(str));
                }
            }
        }
        return hashSet;
    }

    public Set<String> getAllServerTenantNames() {
        HashSet hashSet = new HashSet();
        Iterator<InstanceConfig> it = getAllHelixInstanceConfigs().iterator();
        while (it.hasNext()) {
            for (String str : it.next().getTags()) {
                if (TagNameUtils.isServerTag(str)) {
                    hashSet.add(TagNameUtils.getTenantFromTag(str));
                }
            }
        }
        return hashSet;
    }

    private List<String> getTagsForInstance(String str) {
        return this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(str)).getTags();
    }

    public PinotResourceManagerResponse createServerTenant(Tenant tenant) {
        int numberOfInstances = tenant.getNumberOfInstances();
        List<String> onlineUnTaggedServerInstanceList = getOnlineUnTaggedServerInstanceList();
        if (onlineUnTaggedServerInstanceList.size() < numberOfInstances) {
            String str = "Failed to allocate server instances to Tag : " + tenant.getTenantName() + ", Current number of untagged server instances : " + onlineUnTaggedServerInstanceList.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(str);
            return PinotResourceManagerResponse.failure(str);
        }
        if (tenant.isCoLocated()) {
            assignColocatedServerTenant(tenant, numberOfInstances, onlineUnTaggedServerInstanceList);
        } else {
            assignIndependentServerTenant(tenant, numberOfInstances, onlineUnTaggedServerInstanceList);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void assignIndependentServerTenant(Tenant tenant, int i, List<String> list) {
        String offlineTagForTenant = TagNameUtils.getOfflineTagForTenant(tenant.getTenantName());
        for (int i2 = 0; i2 < tenant.getOfflineInstances(); i2++) {
            retagInstance(list.get(i2), "server_untagged", offlineTagForTenant);
        }
        String realtimeTagForTenant = TagNameUtils.getRealtimeTagForTenant(tenant.getTenantName());
        for (int i3 = 0; i3 < tenant.getRealtimeInstances(); i3++) {
            retagInstance(list.get(i3 + tenant.getOfflineInstances()), "server_untagged", realtimeTagForTenant);
        }
    }

    private void assignColocatedServerTenant(Tenant tenant, int i, List<String> list) {
        int i2 = 0;
        String offlineTagForTenant = TagNameUtils.getOfflineTagForTenant(tenant.getTenantName());
        for (int i3 = 0; i3 < tenant.getOfflineInstances(); i3++) {
            int i4 = i2;
            i2++;
            retagInstance(list.get(i4), "server_untagged", offlineTagForTenant);
        }
        String realtimeTagForTenant = TagNameUtils.getRealtimeTagForTenant(tenant.getTenantName());
        for (int i5 = 0; i5 < tenant.getRealtimeInstances(); i5++) {
            int i6 = i2;
            i2++;
            retagInstance(list.get(i6), "server_untagged", realtimeTagForTenant);
            if (i2 == i) {
                i2 = 0;
            }
        }
    }

    public PinotResourceManagerResponse createBrokerTenant(Tenant tenant) {
        List<String> onlineUnTaggedBrokerInstanceList = getOnlineUnTaggedBrokerInstanceList();
        if (onlineUnTaggedBrokerInstanceList.size() < tenant.getNumberOfInstances()) {
            String str = "Failed to allocate broker instances to Tag : " + tenant.getTenantName() + ", Current number of untagged server instances : " + onlineUnTaggedBrokerInstanceList.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(str);
            return PinotResourceManagerResponse.failure(str);
        }
        String brokerTagForTenant = TagNameUtils.getBrokerTagForTenant(tenant.getTenantName());
        for (int i = 0; i < tenant.getNumberOfInstances(); i++) {
            retagInstance(onlineUnTaggedBrokerInstanceList.get(i), "broker_untagged", brokerTagForTenant);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteOfflineServerTenantFor(String str) {
        String offlineTagForTenant = TagNameUtils.getOfflineTagForTenant(str);
        for (String str2 : HelixHelper.getInstancesWithTag(this._helixZkManager, offlineTagForTenant)) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, str2, offlineTagForTenant);
            if (getTagsForInstance(str2).isEmpty()) {
                this._helixAdmin.addInstanceTag(this._helixClusterName, str2, "server_untagged");
            }
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteRealtimeServerTenantFor(String str) {
        String realtimeTagForTenant = TagNameUtils.getRealtimeTagForTenant(str);
        for (String str2 : HelixHelper.getInstancesWithTag(this._helixZkManager, realtimeTagForTenant)) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, str2, realtimeTagForTenant);
            if (getTagsForInstance(str2).isEmpty()) {
                this._helixAdmin.addInstanceTag(this._helixClusterName, str2, "server_untagged");
            }
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteBrokerTenantFor(String str) {
        String brokerTagForTenant = TagNameUtils.getBrokerTagForTenant(str);
        Iterator it = HelixHelper.getInstancesWithTag(this._helixZkManager, brokerTagForTenant).iterator();
        while (it.hasNext()) {
            retagInstance((String) it.next(), brokerTagForTenant, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public Set<String> getAllInstancesForServerTenant(List<InstanceConfig> list, String str) {
        return HelixHelper.getServerInstancesForTenant(list, str);
    }

    public Set<String> getAllInstancesForServerTenant(String str) {
        return getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs(this._helixZkManager), str);
    }

    public Set<String> getAllInstancesForServerTenantWithType(List<InstanceConfig> list, String str, TableType tableType) {
        return HelixHelper.getServerInstancesForTenantWithType(list, str, tableType);
    }

    public Set<String> getAllInstancesForBrokerTenant(List<InstanceConfig> list, String str) {
        return HelixHelper.getBrokerInstancesForTenant(list, str);
    }

    public Set<String> getAllInstancesForBrokerTenant(String str) {
        return getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs(this._helixZkManager), str);
    }

    public Set<InstanceConfig> getAllInstancesConfigsForBrokerTenant(String str) {
        return HelixHelper.getBrokerInstanceConfigsForTenant(HelixHelper.getInstanceConfigs(this._helixZkManager), str);
    }

    public void addSchema(Schema schema, boolean z) throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Adding schema: {} with override: {}", schemaName, Boolean.valueOf(z));
        Schema schema2 = ZKMetadataProvider.getSchema(this._propertyStore, schemaName);
        if (schema2 == null) {
            ZKMetadataProvider.setSchema(this._propertyStore, schema);
            LOGGER.info("Added schema: {}", schemaName);
        } else {
            if (!z) {
                throw new SchemaAlreadyExistsException(String.format("Schema: %s already exists", schemaName));
            }
            updateSchema(schema, schema2);
        }
    }

    public void updateSchema(Schema schema, boolean z) throws SchemaNotFoundException, SchemaBackwardIncompatibleException, TableNotFoundException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Updating schema: {} with reload: {}", schemaName, Boolean.valueOf(z));
        Schema schema2 = ZKMetadataProvider.getSchema(this._propertyStore, schemaName);
        if (schema2 == null) {
            throw new SchemaNotFoundException(String.format("Schema: %s does not exist", schemaName));
        }
        updateSchema(schema, schema2);
        if (z) {
            LOGGER.info("Reloading tables with name: {}", schemaName);
            Iterator<String> it = getExistingTableNamesWithType(schemaName, null).iterator();
            while (it.hasNext()) {
                reloadAllSegments(it.next(), false);
            }
        }
    }

    private void updateSchema(Schema schema, Schema schema2) throws SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        schema.updateBooleanFieldsIfNeeded(schema2);
        if (schema.equals(schema2)) {
            LOGGER.info("New schema: {} is the same as the existing schema, not updating it", schemaName);
        } else {
            if (!schema.isBackwardCompatibleWith(schema2)) {
                throw new SchemaBackwardIncompatibleException(String.format("New schema: %s is not backward-compatible with the existing schema", schemaName));
            }
            ZKMetadataProvider.setSchema(this._propertyStore, schema);
            LOGGER.info("Updated schema: {}", schemaName);
        }
    }

    public boolean deleteSchema(Schema schema) {
        if (schema == null) {
            return false;
        }
        String schemaName = schema.getSchemaName();
        LOGGER.info("Deleting schema: {}", schemaName);
        String constructPropertyStorePathForSchema = ZKMetadataProvider.constructPropertyStorePathForSchema(schemaName);
        if (!this._propertyStore.exists(constructPropertyStorePathForSchema, AccessOption.PERSISTENT)) {
            return false;
        }
        this._propertyStore.remove(constructPropertyStorePathForSchema, AccessOption.PERSISTENT);
        LOGGER.info("Deleted schema: {}", schemaName);
        return true;
    }

    @Nullable
    public Schema getSchema(String str) {
        return ZKMetadataProvider.getSchema(this._propertyStore, str);
    }

    @Nullable
    public Schema getTableSchema(String str) {
        return ZKMetadataProvider.getTableSchema(this._propertyStore, str);
    }

    @Nullable
    public Schema getSchemaForTableConfig(TableConfig tableConfig) {
        String schemaName;
        Schema schema = getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
        if (schema == null && (schemaName = tableConfig.getValidationConfig().getSchemaName()) != null) {
            schema = getSchema(schemaName);
        }
        return schema;
    }

    public List<String> getSchemaNames() {
        return this._propertyStore.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore).getRelativePath(), AccessOption.PERSISTENT);
    }

    public void addTable(TableConfig tableConfig) throws IOException {
        String schemaName;
        validateTableTenantConfig(tableConfig);
        String tableName = tableConfig.getTableName();
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        TableType tableType = tableConfig.getTableType();
        switch (AnonymousClass2.$SwitchMap$org$apache$pinot$spi$config$table$TableType[tableType.ordinal()]) {
            case 1:
                if (!getAllTables().contains(tableName)) {
                    LOGGER.info("building empty ideal state for table : " + tableName);
                    IdealState buildEmptyIdealStateFor = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableName, Integer.parseInt(validationConfig.getReplication()), this._enableBatchMessageMode);
                    LOGGER.info("adding table via the admin");
                    this._helixAdmin.addResource(this._helixClusterName, tableName, buildEmptyIdealStateFor);
                    ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, tableName, TableConfigUtils.toZNRecord(tableConfig));
                    assignInstances(tableConfig, true);
                    LOGGER.info("Successfully added table: {}", tableName);
                    break;
                } else {
                    throw new TableAlreadyExistsException("Table " + tableName + " already exists");
                }
            case 2:
                verifyStreamConfig(tableName, tableConfig);
                if (ZKMetadataProvider.getSchema(this._propertyStore, TableNameBuilder.extractRawTableName(tableName)) != null || ((schemaName = tableConfig.getValidationConfig().getSchemaName()) != null && ZKMetadataProvider.getSchema(this._propertyStore, schemaName) != null)) {
                    ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, tableName, TableConfigUtils.toZNRecord(tableConfig));
                    assignInstances(tableConfig, true);
                    ensureRealtimeClusterIsSetUp(tableConfig);
                    LOGGER.info("Successfully added or updated the table {} ", tableName);
                    break;
                } else {
                    throw new InvalidTableConfigException("No schema defined for realtime table: " + tableName);
                }
                break;
            default:
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
        }
        LOGGER.info("Updating BrokerResource IdealState for table: {}", tableName);
        List instancesWithTag = HelixHelper.getInstancesWithTag(this._helixZkManager, TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()));
        HelixHelper.updateIdealState(this._helixZkManager, "brokerResource", idealState -> {
            if (!$assertionsDisabled && idealState == null) {
                throw new AssertionError();
            }
            idealState.getRecord().getMapFields().put(tableName, SegmentAssignmentUtils.getInstanceStateMap(instancesWithTag, "ONLINE"));
            return idealState;
        });
    }

    @VisibleForTesting
    void validateTableTenantConfig(TableConfig tableConfig) {
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        String tableName = tableConfig.getTableName();
        String broker = tenantConfig.getBroker();
        String server = tenantConfig.getServer();
        if (broker == null || server == null) {
            if (!this._isSingleTenantCluster) {
                throw new InvalidTableConfigException("server and broker tenants must be specified for multi-tenant cluster for table: " + tableName);
            }
            tableConfig.setTenantConfig(new TenantConfig(broker == null ? "DefaultTenant" : broker, server == null ? "DefaultTenant" : server, tenantConfig.getTagOverrideConfig()));
        }
        TreeSet<String> treeSet = new TreeSet();
        treeSet.add(TagNameUtils.extractBrokerTag(tenantConfig));
        if (tableConfig.isDimTable()) {
            String extractOfflineServerTag = TagNameUtils.extractOfflineServerTag(tenantConfig);
            String extractRealtimeServerTag = TagNameUtils.extractRealtimeServerTag(tenantConfig);
            if (getInstancesWithTag(extractOfflineServerTag).isEmpty() && getInstancesWithTag(extractRealtimeServerTag).isEmpty()) {
                throw new InvalidTableConfigException("Failed to find instances for dimension table: " + tableName);
            }
        } else if (tableConfig.getTableType() == TableType.OFFLINE) {
            treeSet.add(TagNameUtils.extractOfflineServerTag(tenantConfig));
        } else {
            String extractConsumingServerTag = TagNameUtils.extractConsumingServerTag(tenantConfig);
            if (!TagNameUtils.isServerTag(extractConsumingServerTag)) {
                throw new InvalidTableConfigException("Invalid CONSUMING server tag: " + extractConsumingServerTag + " for table: " + tableName);
            }
            treeSet.add(extractConsumingServerTag);
            String extractCompletedServerTag = TagNameUtils.extractCompletedServerTag(tenantConfig);
            if (!TagNameUtils.isServerTag(extractCompletedServerTag)) {
                throw new InvalidTableConfigException("Invalid COMPLETED server tag: " + extractCompletedServerTag + " for table: " + tableName);
            }
            treeSet.add(extractCompletedServerTag);
        }
        for (String str : treeSet) {
            if (getInstancesWithTag(str).isEmpty()) {
                throw new InvalidTableConfigException("Failed to find instances with tag: " + str + " for table: " + tableName);
            }
        }
    }

    public boolean setZKData(String str, ZNRecord zNRecord, int i, int i2) {
        return this._helixDataAccessor.getBaseDataAccessor().set(str, zNRecord, i, i2);
    }

    public boolean deleteZKPath(String str) {
        return this._helixDataAccessor.getBaseDataAccessor().remove(str, -1);
    }

    public ZNRecord readZKData(String str) {
        return (ZNRecord) this._helixDataAccessor.getBaseDataAccessor().get(str, (Stat) null, -1);
    }

    public List<String> getZKChildren(String str) {
        return this._helixDataAccessor.getBaseDataAccessor().getChildNames(str, -1);
    }

    public Map<String, Stat> getZKChildrenStats(String str) {
        List childNames = this._helixDataAccessor.getBaseDataAccessor().getChildNames(str, -1);
        Stat[] stats = this._helixDataAccessor.getBaseDataAccessor().getStats((List) childNames.stream().map(str2 -> {
            return (str + "/" + str2).replaceAll("//", "/");
        }).collect(Collectors.toList()), -1);
        LinkedHashMap linkedHashMap = new LinkedHashMap(childNames.size());
        for (int i = 0; i < childNames.size(); i++) {
            linkedHashMap.put((String) childNames.get(i), stats[i]);
        }
        return linkedHashMap;
    }

    public Stat getZKStat(String str) {
        return this._helixDataAccessor.getBaseDataAccessor().getStat(str, -1);
    }

    public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
        this._pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
    }

    private void verifyStreamConfig(String str, TableConfig tableConfig) {
        if (new StreamConfig(str, IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType() && !this._allowHLCTables) {
            throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + str);
        }
    }

    private void ensureRealtimeClusterIsSetUp(TableConfig tableConfig) {
        TableConfig applyConfigWithEnvVariables = ConfigUtils.applyConfigWithEnvVariables(tableConfig);
        String tableName = applyConfigWithEnvVariables.getTableName();
        StreamConfig streamConfig = new StreamConfig(applyConfigWithEnvVariables.getTableName(), IngestionConfigUtils.getStreamConfigMap(applyConfigWithEnvVariables));
        IdealState tableIdealState = getTableIdealState(tableName);
        if (streamConfig.hasHighLevelConsumerType()) {
            if (tableIdealState == null) {
                LOGGER.info("Initializing IdealState for HLC table: {}", tableName);
                tableIdealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(tableName, applyConfigWithEnvVariables, this._helixZkManager, this._propertyStore, this._enableBatchMessageMode);
                this._helixAdmin.addResource(this._helixClusterName, tableName, tableIdealState);
            } else if (!streamConfig.hasLowLevelConsumerType()) {
                this._pinotLLCRealtimeSegmentManager.removeLLCSegments(tableIdealState);
            }
            ensurePropertyStoreEntryExistsForHighLevelConsumer(tableName);
        }
        if (streamConfig.hasLowLevelConsumerType()) {
            if (!ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, tableName).isEmpty()) {
                LOGGER.info("LLC is already set up for table {}, not configuring again", tableName);
            } else {
                PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(this._pinotLLCRealtimeSegmentManager, tableName, applyConfigWithEnvVariables, tableIdealState, this._enableBatchMessageMode);
                LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", tableName);
            }
        }
    }

    private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String str) {
        String constructPropertyStorePathForResource = ZKMetadataProvider.constructPropertyStorePathForResource(str);
        if (this._propertyStore.exists(constructPropertyStorePathForResource, AccessOption.PERSISTENT)) {
            return;
        }
        LOGGER.info("Creating property store entry for HLC table: {}", str);
        this._propertyStore.create(constructPropertyStorePathForResource, new ZNRecord(str), AccessOption.PERSISTENT);
    }

    private void assignInstances(TableConfig tableConfig, boolean z) {
        String tableName = tableConfig.getTableName();
        String extractRawTableName = TableNameBuilder.extractRawTableName(tableName);
        ArrayList arrayList = new ArrayList();
        for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
            if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType) && (z || InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, instancePartitionsType.getInstancePartitionsName(extractRawTableName)) == null)) {
                arrayList.add(instancePartitionsType);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOGGER.info("Assigning {} instances to table: {}", arrayList, tableName);
        InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
        List<InstanceConfig> allHelixInstanceConfigs = getAllHelixInstanceConfigs();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            InstancePartitions assignInstances = instanceAssignmentDriver.assignInstances((InstancePartitionsType) it.next(), allHelixInstanceConfigs);
            LOGGER.info("Persisting instance partitions: {}", assignInstances);
            InstancePartitionsUtils.persistInstancePartitions(this._propertyStore, assignInstances);
        }
    }

    public void updateTableConfig(TableConfig tableConfig) throws IOException {
        validateTableTenantConfig(tableConfig);
        setExistingTableConfig(tableConfig);
    }

    public void setExistingTableConfig(TableConfig tableConfig) throws IOException {
        String tableName = tableConfig.getTableName();
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        TableType tableType = tableConfig.getTableType();
        switch (AnonymousClass2.$SwitchMap$org$apache$pinot$spi$config$table$TableType[tableType.ordinal()]) {
            case 1:
                ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, tableName, TableConfigUtils.toZNRecord(tableConfig));
                IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableName);
                String replication = validationConfig.getReplication();
                if (!resourceIdealState.getReplicas().equals(replication)) {
                    HelixHelper.updateIdealState(this._helixZkManager, tableName, idealState -> {
                        if (!$assertionsDisabled && idealState == null) {
                            throw new AssertionError();
                        }
                        idealState.setReplicas(replication);
                        return idealState;
                    }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2000000476837158d));
                }
                assignInstances(tableConfig, false);
                break;
            case 2:
                verifyStreamConfig(tableName, tableConfig);
                ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, tableName, TableConfigUtils.toZNRecord(tableConfig));
                assignInstances(tableConfig, false);
                ensureRealtimeClusterIsSetUp(tableConfig);
                break;
            default:
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
        }
        sendTableConfigRefreshMessage(tableName);
    }

    public void updateMetadataConfigFor(String str, TableType tableType, TableCustomConfig tableCustomConfig) throws Exception {
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, TableNameBuilder.forType(tableType).tableNameWithType(str));
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + str + " of type: " + tableType + " does not exist");
        }
        tableConfig.setCustomConfig(tableCustomConfig);
        setExistingTableConfig(tableConfig);
    }

    public void updateSegmentsValidationAndRetentionConfigFor(String str, TableType tableType, SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig) throws Exception {
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, TableNameBuilder.forType(tableType).tableNameWithType(str));
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + str + " of type: " + tableType + " does not exist");
        }
        tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
        setExistingTableConfig(tableConfig);
    }

    public void updateIndexingConfigFor(String str, TableType tableType, IndexingConfig indexingConfig) throws Exception {
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, TableNameBuilder.forType(tableType).tableNameWithType(str));
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + str + " of type: " + tableType + " does not exist");
        }
        tableConfig.setIndexingConfig(indexingConfig);
        setExistingTableConfig(tableConfig);
    }

    public void deleteOfflineTable(String str) {
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(str);
        LOGGER.info("Deleting table {}: Start", tableNameWithType);
        HelixHelper.removeResourceFromBrokerIdealState(this._helixZkManager, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed from broker resource", tableNameWithType);
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(tableNameWithType)) {
            this._helixAdmin.dropResource(this._helixClusterName, tableNameWithType);
            LOGGER.info("Deleting table {}: Removed helix table resource", tableNameWithType);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(tableNameWithType, getSegmentsFor(tableNameWithType));
        LOGGER.info("Deleting table {}: Removed stored segments", tableNameWithType);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment metadata", tableNameWithType);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed table config", tableNameWithType);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed instance partitions", tableNameWithType);
        SegmentLineageAccessHelper.deleteSegmentLineage(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment lineage", tableNameWithType);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, "MergeRollupTask", tableNameWithType);
        LOGGER.info("Deleting table {}: Removed merge rollup task metadata", tableNameWithType);
        LOGGER.info("Deleting table {}: Finish", tableNameWithType);
    }

    public void deleteRealtimeTable(String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        LOGGER.info("Deleting table {}: Start", tableNameWithType);
        HelixHelper.removeResourceFromBrokerIdealState(this._helixZkManager, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed from broker resource", tableNameWithType);
        Set<String> set = null;
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(tableNameWithType)) {
            set = getAllInstancesForTable(tableNameWithType);
            this._helixAdmin.dropResource(this._helixClusterName, tableNameWithType);
            LOGGER.info("Deleting table {}: Removed helix table resource", tableNameWithType);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(tableNameWithType, getSegmentsFor(tableNameWithType));
        LOGGER.info("Deleting table {}: Removed stored segments", tableNameWithType);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment metadata", tableNameWithType);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed table config", tableNameWithType);
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, InstancePartitionsType.CONSUMING.getInstancePartitionsName(extractRawTableName));
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, InstancePartitionsType.COMPLETED.getInstancePartitionsName(extractRawTableName));
        LOGGER.info("Deleting table {}: Removed instance partitions", tableNameWithType);
        SegmentLineageAccessHelper.deleteSegmentLineage(this._propertyStore, tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment lineage", tableNameWithType);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, "MergeRollupTask", tableNameWithType);
        LOGGER.info("Deleting table {}: Removed merge rollup task metadata", tableNameWithType);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, "RealtimeToOfflineSegmentsTask", tableNameWithType);
        LOGGER.info("Deleting table {}: Removed merge realtime to offline metadata", tableNameWithType);
        if (set != null) {
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, it.next());
                if (instanceZKMetadata != null) {
                    instanceZKMetadata.removeResource(tableNameWithType);
                    ZKMetadataProvider.setInstanceZKMetadata(this._propertyStore, instanceZKMetadata);
                }
            }
        }
        LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for HLC table", tableNameWithType);
        LOGGER.info("Deleting table {}: Finish", tableNameWithType);
    }

    public PinotResourceManagerResponse toggleTableState(String str, StateType stateType) {
        if (!hasTable(str)) {
            return PinotResourceManagerResponse.failure("Table: " + str + " not found");
        }
        switch (stateType) {
            case ENABLE:
                this._helixAdmin.enableResource(this._helixClusterName, str, true);
                boolean z = false;
                try {
                    this._helixAdmin.resetResource(this._helixClusterName, Collections.singletonList(str));
                    z = true;
                } catch (HelixException e) {
                    LOGGER.warn("Caught exception while resetting resource: {}", str, e);
                }
                return PinotResourceManagerResponse.success("Table: " + str + " enabled (reset success = " + z + ")");
            case DISABLE:
                this._helixAdmin.enableResource(this._helixClusterName, str, false);
                return PinotResourceManagerResponse.success("Table: " + str + " disabled");
            case DROP:
                if (TableNameBuilder.getTableTypeFromTableName(str) == TableType.OFFLINE) {
                    deleteOfflineTable(str);
                } else {
                    deleteRealtimeTable(str);
                }
                return PinotResourceManagerResponse.success("Table: " + str + " dropped");
            default:
                throw new IllegalStateException();
        }
    }

    private Set<String> getAllInstancesForTable(String str) {
        HashSet hashSet = new HashSet();
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
        Iterator it = resourceIdealState.getPartitionSet().iterator();
        while (it.hasNext()) {
            hashSet.addAll(resourceIdealState.getInstanceSet((String) it.next()));
        }
        return hashSet;
    }

    public void addNewSegment(String str, SegmentMetadata segmentMetadata, String str2) {
        addNewSegment(str, segmentMetadata, str2, null);
    }

    public void addNewSegment(String str, SegmentMetadata segmentMetadata, String str2, @Nullable String str3) {
        InstancePartitionsType instancePartitionsType;
        ZNRecord zNRecord;
        String name = segmentMetadata.getName();
        if (TableNameBuilder.isRealtimeTableResource(str)) {
            Preconditions.checkState(isUpsertTable(str), "Upload segment " + name + " for non upsert enabled realtime table " + str + " is not supported");
            instancePartitionsType = InstancePartitionsType.CONSUMING;
            SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(name);
            ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.REALTIME);
            segmentZKMetadata.setDownloadUrl(str2);
            segmentZKMetadata.setCrypterName(str3);
            segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
            zNRecord = segmentZKMetadata.toZNRecord();
        } else {
            instancePartitionsType = InstancePartitionsType.OFFLINE;
            SegmentZKMetadata segmentZKMetadata2 = new SegmentZKMetadata(name);
            ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata2, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
            segmentZKMetadata2.setDownloadUrl(str2);
            segmentZKMetadata2.setCrypterName(str3);
            segmentZKMetadata2.setPushTime(System.currentTimeMillis());
            zNRecord = segmentZKMetadata2.toZNRecord();
        }
        String constructPropertyStorePathForSegment = ZKMetadataProvider.constructPropertyStorePathForSegment(str, name);
        Preconditions.checkState(this._propertyStore.set(constructPropertyStorePathForSegment, zNRecord, AccessOption.PERSISTENT), "Failed to set segment ZK metadata for table: " + str + ", segment: " + name);
        LOGGER.info("Added segment: {} of table: {} to property store", name, str);
        assignTableSegment(str, name, constructPropertyStorePathForSegment, instancePartitionsType);
    }

    private void assignTableSegment(String str, String str2, String str3, InstancePartitionsType instancePartitionsType) {
        try {
            TableConfig tableConfig = getTableConfig(str);
            Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + str);
            SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixZkManager, tableConfig);
            Map singletonMap = Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils.fetchOrComputeInstancePartitions(this._helixZkManager, tableConfig, instancePartitionsType));
            synchronized (getTableUpdaterLock(str)) {
                HelixHelper.updateIdealState(this._helixZkManager, str, idealState -> {
                    if (!$assertionsDisabled && idealState == null) {
                        throw new AssertionError();
                    }
                    Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
                    if (mapFields.containsKey(str2)) {
                        LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", str2, str);
                    } else {
                        List<String> assignSegment = segmentAssignment.assignSegment(str2, mapFields, singletonMap);
                        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", new Object[]{str2, assignSegment, str});
                        mapFields.put(str2, SegmentAssignmentUtils.getInstanceStateMap(assignSegment, "ONLINE"));
                    }
                    return idealState;
                });
                LOGGER.info("Added segment: {} to IdealState for table: {}", str2, str);
            }
        } catch (Exception e) {
            LOGGER.error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", new Object[]{str2, str, e});
            if (this._propertyStore.remove(str3, AccessOption.PERSISTENT)) {
                LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", str2, str);
            } else {
                LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", str2, str);
            }
            throw e;
        }
    }

    public boolean isUpsertTable(String str) {
        UpsertConfig upsertConfig;
        TableConfig tableConfig = getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(str));
        return (tableConfig == null || (upsertConfig = tableConfig.getUpsertConfig()) == null || upsertConfig.getMode() == UpsertConfig.Mode.NONE) ? false : true;
    }

    private Object getTableUpdaterLock(String str) {
        return this._tableUpdaterLocks[(str.hashCode() & Integer.MAX_VALUE) % this._tableUpdaterLocks.length];
    }

    @Nullable
    public ZNRecord getSegmentMetadataZnRecord(String str, String str2) {
        return ZKMetadataProvider.getZnRecord(this._propertyStore, ZKMetadataProvider.constructPropertyStorePathForSegment(str, str2));
    }

    public boolean updateZkMetadata(String str, SegmentZKMetadata segmentZKMetadata, int i) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, str, segmentZKMetadata, i);
    }

    public boolean updateZkMetadata(String str, SegmentZKMetadata segmentZKMetadata) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, str, segmentZKMetadata);
    }

    public void refreshSegment(String str, SegmentMetadata segmentMetadata, SegmentZKMetadata segmentZKMetadata, int i, String str2, @Nullable String str3) {
        String name = segmentMetadata.getName();
        ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
        segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
        segmentZKMetadata.setDownloadUrl(str2);
        segmentZKMetadata.setCrypterName(str3);
        if (!ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, str, segmentZKMetadata, i)) {
            throw new RuntimeException("Failed to update ZK metadata for segment: " + name + " of table: " + str);
        }
        LOGGER.info("Updated segment: {} of table: {} to property store", name, str);
        sendSegmentRefreshMessage(str, name, true, true);
    }

    public int reloadAllSegments(String str, boolean z) {
        LOGGER.info("Sending reload message for table: {} with forceDownload: {}", str, Boolean.valueOf(z));
        if (z) {
            Preconditions.checkArgument(TableNameBuilder.getTableTypeFromTableName(str) == TableType.OFFLINE, "Table: %s is not an OFFLINE table, which is required to force to download segments", str);
        }
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setInstanceName("%");
        criteria.setResource(str);
        criteria.setSessionSpecific(true);
        int send = this._helixZkManager.getMessagingService().send(criteria, new SegmentReloadMessage(str, (String) null, z), (AsyncCallback) null, -1);
        if (send > 0) {
            LOGGER.info("Sent {} reload messages for table: {}", Integer.valueOf(send), str);
        } else {
            LOGGER.warn("No reload message sent for table: {}", str);
        }
        return send;
    }

    public int reloadSegment(String str, String str2, boolean z) {
        LOGGER.info("Sending reload message for segment: {} in table: {} with forceDownload: {}", new Object[]{str2, str, Boolean.valueOf(z)});
        if (z) {
            Preconditions.checkArgument(TableNameBuilder.getTableTypeFromTableName(str) == TableType.OFFLINE, "Table: %s is not an OFFLINE table, which is required to force to download segment: %s", str, str2);
        }
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setInstanceName("%");
        criteria.setResource(str);
        criteria.setPartition(str2);
        criteria.setSessionSpecific(true);
        int send = this._helixZkManager.getMessagingService().send(criteria, new SegmentReloadMessage(str, str2, z), (AsyncCallback) null, -1);
        if (send > 0) {
            LOGGER.info("Sent {} reload messages for segment: {} in table: {}", new Object[]{Integer.valueOf(send), str2, str});
        } else {
            LOGGER.warn("No reload message sent for segment: {} in table: {}", str2, str);
        }
        return send;
    }

    public void resetSegment(String str, String str2, long j) throws InterruptedException, TimeoutException {
        IdealState tableIdealState = getTableIdealState(str);
        Preconditions.checkState(tableIdealState != null, "Could not find ideal state for table: %s", str);
        ExternalView tableExternalView = getTableExternalView(str);
        Preconditions.checkState(tableExternalView != null, "Could not find external view for table: %s", str);
        Set<String> instanceSet = tableIdealState.getInstanceSet(str2);
        Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s", str2, str);
        Map stateMap = tableExternalView.getStateMap(str2);
        for (String str3 : instanceSet) {
            if (stateMap == null || !SegmentStatusChecker.ERROR.equals(stateMap.get(str3))) {
                LOGGER.info("Disabling segment: {} of table: {}", str2, str);
                this._helixAdmin.enablePartition(false, this._helixClusterName, str3, str, Lists.newArrayList(new String[]{str2}));
            } else {
                LOGGER.info("Resetting segment: {} of table: {}", str2, str);
                this._helixAdmin.resetPartition(this._helixClusterName, str3, str, Lists.newArrayList(new String[]{str2}));
            }
        }
        LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", new Object[]{Long.valueOf(j), str2, str});
        long currentTimeMillis = System.currentTimeMillis();
        HashSet hashSet = new HashSet(instanceSet);
        while (!hashSet.isEmpty() && System.currentTimeMillis() - currentTimeMillis < j) {
            ExternalView tableExternalView2 = getTableExternalView(str);
            Preconditions.checkState(tableExternalView2 != null, "Could not find external view for table: %s", str);
            Map stateMap2 = tableExternalView2.getStateMap(str2);
            if (stateMap2 != null) {
                hashSet.removeIf(str4 -> {
                    return "OFFLINE".equals(stateMap2.get(str4));
                });
                Thread.sleep(1000L);
            }
        }
        if (!hashSet.isEmpty()) {
            throw new TimeoutException(String.format("Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. Disable/reset might complete in the background, but skipping enable of segment.", str2, str));
        }
        LOGGER.info("Enabling segment: {} of table: {}", str2, str);
        Iterator it = instanceSet.iterator();
        while (it.hasNext()) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, (String) it.next(), str, Lists.newArrayList(new String[]{str2}));
        }
    }

    public void resetAllSegments(String str, long j) throws InterruptedException, TimeoutException {
        IdealState tableIdealState = getTableIdealState(str);
        Preconditions.checkState(tableIdealState != null, "Could not find ideal state for table: %s", str);
        ExternalView tableExternalView = getTableExternalView(str);
        Preconditions.checkState(tableExternalView != null, "Could not find external view for table: %s", str);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (String str2 : tableIdealState.getPartitionSet()) {
            Set<String> instanceSet = tableIdealState.getInstanceSet(str2);
            Map stateMap = tableExternalView.getStateMap(str2);
            for (String str3 : instanceSet) {
                if (stateMap == null || !SegmentStatusChecker.ERROR.equals(stateMap.get(str3))) {
                    ((Set) hashMap2.computeIfAbsent(str3, str4 -> {
                        return new HashSet();
                    })).add(str2);
                } else {
                    ((Set) hashMap.computeIfAbsent(str3, str5 -> {
                        return new HashSet();
                    })).add(str2);
                }
            }
            hashMap3.put(str2, new HashSet(instanceSet));
        }
        LOGGER.info("Disabling/resetting segments of table: {}", str);
        for (Map.Entry entry : hashMap.entrySet()) {
            this._helixAdmin.resetPartition(this._helixClusterName, (String) entry.getKey(), str, Lists.newArrayList((Iterable) entry.getValue()));
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            this._helixAdmin.enablePartition(false, this._helixClusterName, (String) entry2.getKey(), str, Lists.newArrayList((Iterable) entry2.getValue()));
        }
        LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", Long.valueOf(j), str);
        long currentTimeMillis = System.currentTimeMillis();
        while (!hashMap3.isEmpty() && System.currentTimeMillis() - currentTimeMillis < j) {
            ExternalView tableExternalView2 = getTableExternalView(str);
            Preconditions.checkState(tableExternalView2 != null, "Could not find external view for table: %s", str);
            Iterator it = hashMap3.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry3 = (Map.Entry) it.next();
                String str6 = (String) entry3.getKey();
                Set set = (Set) entry3.getValue();
                Map stateMap2 = tableExternalView2.getStateMap(str6);
                if (stateMap2 != null) {
                    boolean z = true;
                    Iterator it2 = set.iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            if (!"OFFLINE".equals(stateMap2.get((String) it2.next()))) {
                                z = false;
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                    if (z) {
                        it.remove();
                    }
                }
            }
            Thread.sleep(1000L);
        }
        if (!hashMap3.isEmpty()) {
            throw new TimeoutException(String.format("Timed out waiting for external view to stabilize after call to disable/reset segments. Disable/reset might complete in the background, but skipping enable of segments of table: %s", str));
        }
        LOGGER.info("Enabling segments of table: {}", str);
        for (Map.Entry entry4 : hashMap.entrySet()) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, (String) entry4.getKey(), str, Lists.newArrayList((Iterable) entry4.getValue()));
        }
        for (Map.Entry entry5 : hashMap2.entrySet()) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, (String) entry5.getKey(), str, Lists.newArrayList((Iterable) entry5.getValue()));
        }
    }

    public void sendSegmentRefreshMessage(String str, String str2, boolean z, boolean z2) {
        SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(str, str2);
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setInstanceName("%");
        criteria.setSessionSpecific(true);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        if (z) {
            criteria.setResource(str);
            criteria.setPartition(str2);
            int send = messagingService.send(criteria, segmentRefreshMessage, (AsyncCallback) null, -1);
            if (send > 0) {
                LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", new Object[]{Integer.valueOf(send), str2, str});
            } else {
                LOGGER.warn("No segment refresh message sent to servers for segment: {} of table: {}", str2, str);
            }
        }
        if (z2) {
            criteria.setResource("brokerResource");
            criteria.setPartition(str);
            int send2 = messagingService.send(criteria, segmentRefreshMessage, (AsyncCallback) null, -1);
            if (send2 > 0) {
                LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", new Object[]{Integer.valueOf(send2), str2, str});
            } else {
                LOGGER.warn("No segment refresh message sent to brokers for segment: {} of table: {}", str2, str);
            }
        }
    }

    private void sendTableConfigRefreshMessage(String str) {
        TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(str);
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setInstanceName("%");
        criteria.setResource("brokerResource");
        criteria.setSessionSpecific(true);
        criteria.setPartition(str);
        int send = this._helixZkManager.getMessagingService().send(criteria, tableConfigRefreshMessage, (AsyncCallback) null, -1);
        if (send > 0) {
            LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", Integer.valueOf(send), str);
        } else {
            LOGGER.warn("No table config refresh message sent to brokers for table: {}", str);
        }
    }

    private void sendRoutingTableRebuildMessage(String str) {
        RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(str);
        Criteria criteria = new Criteria();
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setInstanceName("%");
        criteria.setResource("brokerResource");
        criteria.setSessionSpecific(true);
        criteria.setPartition(str);
        int send = this._helixZkManager.getMessagingService().send(criteria, routingTableRebuildMessage, (AsyncCallback) null, -1);
        if (send > 0) {
            LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", Integer.valueOf(send), str);
        } else {
            LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", str);
        }
    }

    public void toggleQueryQuotaStateForBroker(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("queryRateLimitDisabled", Boolean.toString("DISABLE".equals(str2)));
        this._helixAdmin.setConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, new String[]{this._helixClusterName}).forParticipant(str).build(), hashMap);
    }

    public Map<String, List<String>> getServerToSegmentsMap(String str) {
        TreeMap treeMap = new TreeMap();
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
        if (resourceIdealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + str);
        }
        for (String str2 : resourceIdealState.getPartitionSet()) {
            Iterator it = resourceIdealState.getInstanceStateMap(str2).keySet().iterator();
            while (it.hasNext()) {
                ((List) treeMap.computeIfAbsent((String) it.next(), str3 -> {
                    return new ArrayList();
                })).add(str2);
            }
        }
        return treeMap;
    }

    public Set<String> getConsumingSegments(String str) {
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
        if (resourceIdealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + str);
        }
        HashSet hashSet = new HashSet();
        for (String str2 : resourceIdealState.getPartitionSet()) {
            if (resourceIdealState.getInstanceStateMap(str2).containsValue("CONSUMING")) {
                hashSet.add(str2);
            }
        }
        return hashSet;
    }

    public Set<String> getServersForSegment(String str, String str2) {
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
        if (resourceIdealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + str);
        }
        return new HashSet(resourceIdealState.getInstanceStateMap(str2).keySet());
    }

    public synchronized Map<String, String> getSegmentsCrcForTable(String str) {
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
        ArrayList<String> arrayList = new ArrayList(resourceIdealState.getPartitionSet());
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(buildPathForSegmentMetadata(str, (String) it.next()));
        }
        if (!this._segmentCrcMap.containsKey(str)) {
            this._lastKnownSegmentMetadataVersionMap.put(str, new HashMap());
            this._segmentCrcMap.put(str, new HashMap());
        }
        Stat[] stats = this._propertyStore.getStats(arrayList2, AccessOption.PERSISTENT);
        for (int i = 0; i < stats.length; i++) {
            String str2 = (String) arrayList.get(i);
            Stat stat = stats[i];
            if (stat != null) {
                int version = stat.getVersion();
                if (!this._lastKnownSegmentMetadataVersionMap.get(str).containsKey(str2)) {
                    updateSegmentMetadataCrc(str, str2, version);
                } else if (this._lastKnownSegmentMetadataVersionMap.get(str).get(str2).intValue() != version) {
                    updateSegmentMetadataCrc(str, str2, version);
                }
            }
        }
        Set partitionSet = resourceIdealState.getPartitionSet();
        Iterator<Map.Entry<String, Long>> it2 = this._segmentCrcMap.get(str).entrySet().iterator();
        while (it2.hasNext()) {
            String key = it2.next().getKey();
            if (!partitionSet.contains(key)) {
                it2.remove();
                this._lastKnownSegmentMetadataVersionMap.get(str).remove(key);
            }
        }
        TreeMap treeMap = new TreeMap();
        for (String str3 : arrayList) {
            treeMap.put(str3, String.valueOf(this._segmentCrcMap.get(str).get(str3)));
        }
        return treeMap;
    }

    private void updateSegmentMetadataCrc(String str, String str2, int i) {
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, str, str2);
        if (!$assertionsDisabled && segmentZKMetadata == null) {
            throw new AssertionError();
        }
        this._lastKnownSegmentMetadataVersionMap.get(str).put(str2, Integer.valueOf(i));
        this._segmentCrcMap.get(str).put(str2, Long.valueOf(segmentZKMetadata.getCrc()));
    }

    public String buildPathForSegmentMetadata(String str, String str2) {
        return "/SEGMENTS/" + str + "/" + str2;
    }

    public boolean hasTable(String str) {
        return getAllResources().contains(str);
    }

    public boolean hasOfflineTable(String str) {
        return hasTable(TableNameBuilder.OFFLINE.tableNameWithType(str));
    }

    public boolean hasRealtimeTable(String str) {
        return hasTable(TableNameBuilder.REALTIME.tableNameWithType(str));
    }

    public boolean isTableEnabled(String str) throws TableNotFoundException {
        IdealState tableIdealState = getTableIdealState(str);
        if (tableIdealState == null) {
            throw new TableNotFoundException("Failed to find ideal state for table: " + str);
        }
        return tableIdealState.isEnabled();
    }

    @Nullable
    public IdealState getTableIdealState(String str) {
        return this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
    }

    @Nullable
    public ExternalView getTableExternalView(String str) {
        return this._helixAdmin.getResourceExternalView(this._helixClusterName, str);
    }

    @Nullable
    public TableConfig getTableConfig(String str) {
        return ZKMetadataProvider.getTableConfig(this._propertyStore, str);
    }

    @Nullable
    public TableConfig getOfflineTableConfig(String str) {
        return ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, str);
    }

    @Nullable
    public TableConfig getRealtimeTableConfig(String str) {
        return ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, str);
    }

    @Nullable
    public TableConfig getTableConfig(String str, TableType tableType) {
        return tableType == TableType.OFFLINE ? getOfflineTableConfig(str) : getRealtimeTableConfig(str);
    }

    public List<TableConfig> getTableConfigsForSchema(String str) {
        ArrayList arrayList = new ArrayList();
        TableConfig offlineTableConfig = getOfflineTableConfig(str);
        if (offlineTableConfig != null) {
            arrayList.add(offlineTableConfig);
        }
        TableConfig realtimeTableConfig = getRealtimeTableConfig(str);
        if (realtimeTableConfig != null) {
            arrayList.add(realtimeTableConfig);
        }
        return arrayList;
    }

    public List<String> getServerInstancesForTable(String str, TableType tableType) {
        TableConfig tableConfig = getTableConfig(str, tableType);
        Preconditions.checkNotNull(tableConfig);
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        HashSet hashSet = new HashSet();
        List instanceConfigs = HelixHelper.getInstanceConfigs(this._helixZkManager);
        if (tableType == TableType.OFFLINE) {
            hashSet.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.extractOfflineServerTag(tenantConfig)));
        } else if (TableType.REALTIME.equals(tableType)) {
            hashSet.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.extractConsumingServerTag(tenantConfig)));
            hashSet.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.extractCompletedServerTag(tenantConfig)));
        }
        return new ArrayList(hashSet);
    }

    public List<String> getBrokerInstancesForTable(String str, TableType tableType) {
        TableConfig tableConfig = getTableConfig(str, tableType);
        Preconditions.checkNotNull(tableConfig);
        return HelixHelper.getInstancesWithTag(this._helixZkManager, TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()));
    }

    public PinotResourceManagerResponse enableInstance(String str) {
        return enableInstance(str, true, 10000L);
    }

    public PinotResourceManagerResponse disableInstance(String str) {
        return enableInstance(str, false, 10000L);
    }

    public PinotResourceManagerResponse dropInstance(String str) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(str)) != null) {
            return PinotResourceManagerResponse.failure("Instance " + str + " is still live");
        }
        for (String str2 : getAllResources()) {
            IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str2);
            Iterator it = resourceIdealState.getPartitionSet().iterator();
            while (it.hasNext()) {
                if (resourceIdealState.getInstanceSet((String) it.next()).contains(str)) {
                    return PinotResourceManagerResponse.failure("Instance " + str + " exists in ideal state for " + str2);
                }
            }
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                return Boolean.valueOf(this._helixDataAccessor.removeProperty(this._keyBuilder.instance(str)));
            });
            try {
                DEFAULT_RETRY_POLICY.attempt(() -> {
                    return Boolean.valueOf(this._helixDataAccessor.removeProperty(this._keyBuilder.instanceConfig(str)));
                });
                return PinotResourceManagerResponse.success("Instance " + str + " dropped");
            } catch (Exception e) {
                return PinotResourceManagerResponse.failure("Failed to remove /CONFIGS/PARTICIPANT/" + str + ". Make sure to remove /CONFIGS/PARTICIPANT/" + str + " manually since /INSTANCES/" + str + " has already been removed");
            }
        } catch (Exception e2) {
            return PinotResourceManagerResponse.failure("Failed to remove /INSTANCES/" + str);
        }
    }

    private PinotResourceManagerResponse enableInstance(String str, boolean z, long j) {
        if (!instanceExists(str)) {
            return PinotResourceManagerResponse.failure("Instance " + str + " not found");
        }
        this._helixAdmin.enableInstance(this._helixClusterName, str, z);
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            LiveInstance property = this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(str));
            if (property != null) {
                boolean z2 = true;
                List childValues = this._helixDataAccessor.getChildValues(this._keyBuilder.currentStates(str, property.getSessionId()), true);
                if (childValues.isEmpty()) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
                Iterator it = childValues.iterator();
                while (it.hasNext()) {
                    for (String str2 : ((CurrentState) it.next()).getPartitionStateMap().values()) {
                        if ((z && !"OFFLINE".equals(str2)) || (!z && "OFFLINE".equals(str2))) {
                            z2 = false;
                            break;
                        }
                    }
                    if (!z2) {
                        break;
                    }
                }
                if (z2) {
                    return z ? PinotResourceManagerResponse.success("Instance " + str + " enabled") : PinotResourceManagerResponse.success("Instance " + str + " disabled");
                }
            } else if (!z) {
                return PinotResourceManagerResponse.SUCCESS;
            }
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                LOGGER.warn("Got interrupted when sleeping for {}ms to wait until the current state matched for instance: {}", 500L, str);
                return PinotResourceManagerResponse.failure("Got interrupted when waiting for instance to be " + (z ? "enabled" : "disabled"));
            }
        }
        return PinotResourceManagerResponse.failure("Instance " + (z ? "enable" : "disable") + " failed, timeout");
    }

    public RebalanceResult rebalanceTable(String str, Configuration configuration) throws TableNotFoundException {
        TableConfig tableConfig = getTableConfig(str);
        if (tableConfig == null) {
            throw new TableNotFoundException("Failed to find table config for table: " + str);
        }
        return new TableRebalancer(this._helixZkManager).rebalance(tableConfig, configuration);
    }

    public boolean instanceExists(String str) {
        return getHelixInstanceConfig(str) != null;
    }

    public List<String> getOnlineUnTaggedBrokerInstanceList() {
        List<String> instancesWithTag = HelixHelper.getInstancesWithTag(this._helixZkManager, "broker_untagged");
        instancesWithTag.retainAll(this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances()));
        return instancesWithTag;
    }

    public List<String> getOnlineUnTaggedServerInstanceList() {
        List<String> instancesWithTag = HelixHelper.getInstancesWithTag(this._helixZkManager, "server_untagged");
        instancesWithTag.retainAll(this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances()));
        return instancesWithTag;
    }

    public List<String> getOnlineInstanceList() {
        return this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
    }

    public BiMap<String, String> getDataInstanceAdminEndpoints(Set<String> set) throws InvalidConfigException {
        HashBiMap create = HashBiMap.create(set.size());
        for (String str : set) {
            try {
                create.put(str, (String) this._instanceAdminEndpointCache.get(str));
            } catch (ExecutionException e) {
                String format = String.format("ExecutionException when getting instance admin endpoint for instance: %s. Error message: %s", str, e.getMessage());
                LOGGER.error(format, e);
                throw new InvalidConfigException(format);
            }
        }
        return create;
    }

    public List<String> getExistingTableNamesWithType(String str, @Nullable TableType tableType) throws TableNotFoundException {
        ArrayList arrayList = new ArrayList(2);
        TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(str);
        if (tableTypeFromTableName == null) {
            if (tableType == null || tableType == TableType.OFFLINE) {
                String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(str);
                if (getTableConfig(tableNameWithType) != null) {
                    arrayList.add(tableNameWithType);
                }
            }
            if (tableType == null || tableType == TableType.REALTIME) {
                String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(str);
                if (getTableConfig(tableNameWithType2) != null) {
                    arrayList.add(tableNameWithType2);
                }
            }
        } else {
            if (tableType != null && tableType != tableTypeFromTableName) {
                throw new IllegalArgumentException("Table name: " + str + " does not match table type: " + tableType);
            }
            if (getTableConfig(str) != null) {
                arrayList.add(str);
            }
        }
        if (arrayList.isEmpty()) {
            throw new TableNotFoundException("Table '" + str + (tableType != null ? "_" + tableType.toString() : "") + "' not found.");
        }
        return arrayList;
    }

    public String startReplaceSegments(String str, List<String> list, List<String> list2) {
        String generateLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
        HashSet hashSet = new HashSet(getSegmentsFor(str));
        Preconditions.checkArgument(hashSet.containsAll(list), String.format("Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", str, list, list2, hashSet));
        Preconditions.checkArgument(Collections.disjoint(hashSet, list2), String.format("Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", str, list, list2, hashSet));
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                SegmentLineage fromZNRecord;
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, str);
                int i = -1;
                if (segmentLineageZNRecord == null) {
                    fromZNRecord = new SegmentLineage(str);
                } else {
                    fromZNRecord = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
                    i = segmentLineageZNRecord.getVersion();
                }
                Preconditions.checkArgument(fromZNRecord.getLineageEntry(generateLineageEntryId) == null, String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", generateLineageEntryId));
                Iterator it = fromZNRecord.getLineageEntryIds().iterator();
                while (it.hasNext()) {
                    LineageEntry lineageEntry = fromZNRecord.getLineageEntry((String) it.next());
                    if (lineageEntry.getState() != LineageEntryState.REVERTED) {
                        Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), list), String.format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from existing lineage entry = %s, requested segmentsFrom = %s)", str, lineageEntry.getSegmentsFrom(), list));
                    }
                    Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), list2), String.format("It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from existing lineage entry = %s, requested segmentsTo = %s)", str, lineageEntry.getSegmentsTo(), list2));
                }
                fromZNRecord.addLineageEntry(generateLineageEntryId, new LineageEntry(list, list2, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
                return Boolean.valueOf(SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, fromZNRecord, i));
            });
            LOGGER.info("startReplaceSegments is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, segmentsTo = {}, segmentLineageEntryId = {})", new Object[]{str, list, list2, generateLineageEntryId});
            return generateLineageEntryId;
        } catch (Exception e) {
            String format = String.format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)", str, list, list2);
            LOGGER.error(format, e);
            throw new RuntimeException(format, e);
        }
    }

    public void endReplaceSegments(String str, String str2) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, str);
                Preconditions.checkArgument(segmentLineageZNRecord != null, String.format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')", str, str2));
                SegmentLineage fromZNRecord = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
                int version = segmentLineageZNRecord.getVersion();
                LineageEntry lineageEntry = fromZNRecord.getLineageEntry(str2);
                Preconditions.checkArgument(lineageEntry != null, String.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", str, str2));
                if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
                    LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, segmentLineageEntryId={})", str, str2);
                    return true;
                }
                HashSet hashSet = new HashSet(getSegmentsFor(str));
                Preconditions.checkArgument(hashSet.containsAll(lineageEntry.getSegmentsTo()), String.format("Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", str, lineageEntry.getSegmentsTo(), hashSet));
                try {
                    waitForSegmentsBecomeOnline(str, new HashSet(lineageEntry.getSegmentsTo()));
                    fromZNRecord.updateLineageEntry(str2, new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED, System.currentTimeMillis()));
                    if (!SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, fromZNRecord, version)) {
                        return false;
                    }
                    sendRoutingTableRebuildMessage(str);
                    return true;
                } catch (TimeoutException e) {
                    LOGGER.warn(String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", str, lineageEntry.getSegmentsTo()), e);
                    return false;
                }
            });
            LOGGER.info("endReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", str, str2);
        } catch (Exception e) {
            String format = String.format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)", str, str2);
            LOGGER.error(format, e);
            throw new RuntimeException(format, e);
        }
    }

    public void revertReplaceSegments(String str, String str2, boolean z) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, str);
                Preconditions.checkArgument(segmentLineageZNRecord != null, String.format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')", str, str2));
                SegmentLineage fromZNRecord = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
                int version = segmentLineageZNRecord.getVersion();
                LineageEntry lineageEntry = fromZNRecord.getLineageEntry(str2);
                Preconditions.checkArgument(lineageEntry != null, String.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", str, str2));
                if (lineageEntry.getState() != LineageEntryState.COMPLETED && (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !z)) {
                    LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", new Object[]{str, str2, lineageEntry.getState(), Boolean.valueOf(z)});
                    return false;
                }
                Set<String> onlineSegmentsFromExternalView = getOnlineSegmentsFromExternalView(str);
                Preconditions.checkArgument(onlineSegmentsFromExternalView.containsAll(lineageEntry.getSegmentsFrom()), String.format("Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', segmentsFrom = '%s', onlineSegments = '%s'", str, lineageEntry.getSegmentsFrom(), onlineSegmentsFromExternalView));
                fromZNRecord.updateLineageEntry(str2, new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED, System.currentTimeMillis()));
                if (!SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, fromZNRecord, version)) {
                    return false;
                }
                sendRoutingTableRebuildMessage(str);
                return true;
            });
            LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", str, str2);
        } catch (Exception e) {
            String format = String.format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)", str, str2);
            LOGGER.error(format, e);
            throw new RuntimeException(format, e);
        }
    }

    private void waitForSegmentsBecomeOnline(String str, Set<String> set) throws InterruptedException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
        while (!getOnlineSegmentsFromExternalView(str).containsAll(set)) {
            Thread.sleep(1000L);
            if (System.currentTimeMillis() >= currentTimeMillis) {
                throw new TimeoutException(String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", str, set));
            }
        }
    }

    private Set<String> getOnlineSegmentsFromExternalView(String str) {
        ExternalView tableExternalView = getTableExternalView(str);
        Preconditions.checkState(tableExternalView != null, String.format("External view is null for table (%s)", str));
        Map mapFields = tableExternalView.getRecord().getMapFields();
        HashSet hashSet = new HashSet(HashUtil.getHashMapCapacity(mapFields.size()));
        for (Map.Entry entry : mapFields.entrySet()) {
            Map map = (Map) entry.getValue();
            if (map.containsValue("ONLINE") || map.containsValue("CONSUMING")) {
                hashSet.add((String) entry.getKey());
            }
        }
        return hashSet;
    }

    public TableStats getTableStats(String str) {
        Stat stat = this._propertyStore.getStat(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(str), AccessOption.PERSISTENT);
        Preconditions.checkState(stat != null, "Failed to read ZK stats for table: %s", str);
        return new TableStats(SIMPLE_DATE_FORMAT.format(Long.valueOf(stat.getCtime())));
    }

    public List<String> getLiveBrokersForTable(String str) {
        ExternalView property = this._helixDataAccessor.getProperty(this._keyBuilder.externalView("brokerResource"));
        if (property == null) {
            return Collections.EMPTY_LIST;
        }
        Map stateMap = property.getStateMap(str);
        ArrayList arrayList = new ArrayList();
        if (stateMap != null) {
            for (Map.Entry entry : stateMap.entrySet()) {
                if ("ONLINE".equalsIgnoreCase((String) entry.getValue())) {
                    arrayList.add((String) entry.getKey());
                }
            }
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !PinotHelixResourceManager.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
        DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0d);
        SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
    }
}
