/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.Instance;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableCustomConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.config.Tenant;
import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.messages.QueryQuotaUpdateMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.resources.StateType;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotHelixResourceManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
    private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS = 500L;
    private static final long CACHE_ENTRY_EXPIRE_TIME_HOURS = 6L;
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)2.0);
    public static final String APPEND = "APPEND";
    private final Map<String, Map<String, Long>> _segmentCrcMap = new HashMap<String, Map<String, Long>>();
    private final Map<String, Map<String, Integer>> _lastKnownSegmentMetadataVersionMap = new HashMap<String, Map<String, Integer>>();
    private final LoadingCache<String, String> _instanceAdminEndpointCache;
    private final String _helixZkURL;
    private final String _helixClusterName;
    private final String _dataDir;
    private final long _externalViewOnlineToOfflineTimeoutMillis;
    private final boolean _isSingleTenantCluster;
    private final boolean _enableBatchMessageMode;
    private final boolean _allowHLCTables;
    private HelixManager _helixZkManager;
    private String _instanceId;
    private HelixAdmin _helixAdmin;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private HelixDataAccessor _helixDataAccessor;
    private PropertyKey.Builder _keyBuilder;
    private SegmentDeletionManager _segmentDeletionManager;
    private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
    private TableCache _tableCache;

    public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, long externalViewOnlineToOfflineTimeoutMillis, boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables) {
        this._helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
        this._helixClusterName = helixClusterName;
        this._dataDir = dataDir;
        this._externalViewOnlineToOfflineTimeoutMillis = externalViewOnlineToOfflineTimeoutMillis;
        this._isSingleTenantCluster = isSingleTenantCluster;
        this._enableBatchMessageMode = enableBatchMessageMode;
        this._allowHLCTables = allowHLCTables;
        this._instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(6L, TimeUnit.HOURS).build((CacheLoader)new CacheLoader<String, String>(){

            public String load(String instanceId) {
                InstanceConfig instanceConfig = PinotHelixResourceManager.this.getHelixInstanceConfig(instanceId);
                Preconditions.checkNotNull((Object)instanceConfig, (String)"Failed to find instance config for: %s", (Object)instanceId);
                String hostname = instanceConfig.getHostName();
                if (hostname.startsWith("Server_")) {
                    hostname = hostname.substring("Server_".length());
                }
                int adminPort = instanceConfig.getRecord().getIntField("adminPort", 8097);
                return hostname + ":" + adminPort;
            }
        });
    }

    public PinotHelixResourceManager(ControllerConf controllerConf) {
        this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
    }

    public synchronized void start(HelixManager helixZkManager) {
        this._helixZkManager = helixZkManager;
        this._instanceId = this._helixZkManager.getInstanceName();
        this._helixAdmin = this._helixZkManager.getClusterManagmentTool();
        this._propertyStore = this._helixZkManager.getHelixPropertyStore();
        this._helixDataAccessor = this._helixZkManager.getHelixDataAccessor();
        this._keyBuilder = this._helixDataAccessor.keyBuilder();
        this.addInstanceGroupTagIfNeeded();
        this._segmentDeletionManager = new SegmentDeletionManager(this._dataDir, this._helixAdmin, this._helixClusterName, this._propertyStore);
        ZKMetadataProvider.setClusterTenantIsolationEnabled(this._propertyStore, (boolean)this._isSingleTenantCluster);
        this._tableCache = new TableCache(this._propertyStore);
    }

    public synchronized void stop() {
        this._segmentDeletionManager.stop();
    }

    public String getHelixZkURL() {
        return this._helixZkURL;
    }

    public String getHelixClusterName() {
        return this._helixClusterName;
    }

    public SegmentDeletionManager getSegmentDeletionManager() {
        return this._segmentDeletionManager;
    }

    public HelixManager getHelixZkManager() {
        return this._helixZkManager;
    }

    public HelixAdmin getHelixAdmin() {
        return this._helixAdmin;
    }

    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    private void addInstanceGroupTagIfNeeded() {
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(this._instanceId);
        if (instanceConfig != null && !instanceConfig.containsTag("controller")) {
            LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", (Object)this._instanceId, (Object)"controller");
            instanceConfig.addTag("controller");
            HelixDataAccessor accessor = this._helixZkManager.getHelixDataAccessor();
            accessor.setProperty(accessor.keyBuilder().instanceConfig(this._instanceId), (HelixProperty)instanceConfig);
        }
    }

    public List<String> getAllInstances() {
        return this._helixAdmin.getInstancesInCluster(this._helixClusterName);
    }

    public List<InstanceConfig> getAllHelixInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
    }

    @Nullable
    public InstanceConfig getHelixInstanceConfig(String instanceId) {
        return (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceId));
    }

    @Nullable
    public InstanceZKMetadata getInstanceZKMetadata(String instanceId) {
        return ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, (String)instanceId);
    }

    public List<String> getBrokerInstancesFor(String tableName) {
        String brokerTenantName = null;
        TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
        if (offlineTableConfig != null) {
            brokerTenantName = offlineTableConfig.getTenantConfig().getBroker();
        } else {
            TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
            if (realtimeTableConfig != null) {
                brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker();
            }
        }
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getBrokerTagForTenant((String)brokerTenantName));
    }

    public List<String> getInstancesWithTag(String tag) {
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)tag);
    }

    public synchronized PinotResourceManagerResponse addInstance(Instance instance) {
        String instanceIdToAdd;
        List<String> instances = this.getAllInstances();
        if (instances.contains(instanceIdToAdd = instance.getInstanceId())) {
            return PinotResourceManagerResponse.failure("Instance " + instanceIdToAdd + " already exists");
        }
        this._helixAdmin.addInstance(this._helixClusterName, instance.toInstanceConfig());
        return PinotResourceManagerResponse.SUCCESS;
    }

    public List<String> getAllResources() {
        return this._helixAdmin.getResourcesInCluster(this._helixClusterName);
    }

    public List<String> getAllTables() {
        ArrayList<String> tableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            tableNames.add(resourceName);
        }
        return tableNames;
    }

    public List<String> getAllOfflineTables() {
        ArrayList<String> offlineTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isOfflineTableResource((String)resourceName)) continue;
            offlineTableNames.add(resourceName);
        }
        return offlineTableNames;
    }

    public List<String> getAllRealtimeTables() {
        ArrayList<String> realtimeTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isRealtimeTableResource((String)resourceName)) continue;
            realtimeTableNames.add(resourceName);
        }
        return realtimeTableNames;
    }

    public List<String> getAllRawTables() {
        HashSet<String> rawTableNames = new HashSet<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            rawTableNames.add(TableNameBuilder.extractRawTableName((String)resourceName));
        }
        return new ArrayList<String>(rawTableNames);
    }

    public String getActualTableName(String tableName) {
        return this._tableCache.getActualTableName(tableName);
    }

    public String getActualColumnName(String tableName, String columnName) {
        return this._tableCache.getActualColumnName(tableName, columnName);
    }

    public List<String> getSegmentsFor(String tableNameWithType) {
        return ZKMetadataProvider.getSegments(this._propertyStore, (String)tableNameWithType);
    }

    @Nullable
    public OfflineSegmentZKMetadata getOfflineSegmentZKMetadata(String tableName, String segmentName) {
        return ZKMetadataProvider.getOfflineSegmentZKMetadata(this._propertyStore, (String)tableName, (String)segmentName);
    }

    public List<OfflineSegmentZKMetadata> getOfflineSegmentMetadata(String tableName) {
        return ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(this._propertyStore, (String)tableName);
    }

    public List<RealtimeSegmentZKMetadata> getRealtimeSegmentMetadata(String tableName) {
        return ZKMetadataProvider.getRealtimeSegmentZKMetadataListForTable(this._propertyStore, (String)tableName);
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
        try {
            LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, (Object)tableNameWithType);
            Preconditions.checkArgument((boolean)TableNameBuilder.isTableResource((String)tableNameWithType), (String)"Table name: %s is not a valid table name with type suffix", (Object)tableNameWithType);
            HelixHelper.removeSegmentsFromIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, segmentNames);
            this._segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
            return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while deleting segment: {} from table: {}", new Object[]{segmentNames, tableNameWithType, e});
            return PinotResourceManagerResponse.failure(e.getMessage());
        }
    }

    public synchronized PinotResourceManagerResponse deleteSegment(String tableNameWithType, String segmentName) {
        return this.deleteSegments(tableNameWithType, Collections.singletonList(segmentName));
    }

    private boolean ifExternalViewChangeReflectedForState(String tableNameWithType, String segmentName, String targetState, long timeoutMillis, boolean considerErrorStateAsDifferentFromTarget) {
        long externalViewChangeCompletedDeadline = System.currentTimeMillis() + timeoutMillis;
        block0: while (System.currentTimeMillis() < externalViewChangeCompletedDeadline) {
            ExternalView externalView = this._helixAdmin.getResourceExternalView(this._helixClusterName, tableNameWithType);
            Map segmentStatsMap = externalView.getStateMap(segmentName);
            if (segmentStatsMap != null) {
                LOGGER.info("Found {} instances for segment '{}' in external view", (Object)segmentStatsMap.size(), (Object)segmentName);
                for (String instance : segmentStatsMap.keySet()) {
                    String segmentState = (String)segmentStatsMap.get(instance);
                    if (segmentState.equalsIgnoreCase(targetState) || "ERROR".equalsIgnoreCase(segmentState) && !considerErrorStateAsDifferentFromTarget) continue;
                    Uninterruptibles.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
                    continue block0;
                }
                return true;
            }
            Uninterruptibles.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        LOGGER.info("Timed out while waiting for segment '{}' to become '{}' in external view.", (Object)segmentName, (Object)targetState);
        return false;
    }

    public PinotResourceManagerResponse updateBrokerTenant(Tenant tenant) {
        String brokerTenantTag = TagNameUtils.getBrokerTagForTenant((String)tenant.getTenantName());
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTenantTag);
        if (instancesInClusterWithTag.size() > tenant.getNumberOfInstances()) {
            return this.scaleDownBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        if (instancesInClusterWithTag.size() < tenant.getNumberOfInstances()) {
            return this.scaleUpBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleUpBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstancesToAdd = tenant.getNumberOfInstances() - instancesInClusterWithTag.size();
        if (unTaggedInstanceList.size() < numberOfInstancesToAdd) {
            String message = "Failed to allocate broker instances to Tag : " + tenant.getTenantName() + ", Current number of untagged broker instances : " + unTaggedInstanceList.size() + ", Current number of tagged broker instances : " + instancesInClusterWithTag.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        for (int i = 0; i < numberOfInstancesToAdd; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "broker_untagged", brokerTenantTag);
            this.addInstanceToBrokerIdealState(brokerTenantTag, instanceName);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception {
        TableConfig tableConfig;
        try {
            tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while getting table config for table {}", (Object)tableNameWithType, (Object)e);
            throw new InvalidTableConfigException("Failed to fetch broker tag for table " + tableNameWithType + " due to exception: " + e.getMessage());
        }
        if (tableConfig == null) {
            LOGGER.warn("Table " + tableNameWithType + " does not exist");
            throw new InvalidConfigException("Invalid table configuration for table " + tableNameWithType + ". Table does not exist");
        }
        return this.rebuildBrokerResource(tableNameWithType, this.getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
    }

    public PinotResourceManagerResponse rebuildBrokerResource(String tableNameWithType, Set<String> brokerInstances) {
        IdealState brokerIdealState = HelixHelper.getBrokerIdealStates((HelixAdmin)this._helixAdmin, (String)this._helixClusterName);
        Set brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType);
        if (brokerInstancesInIdealState.equals(brokerInstances)) {
            return PinotResourceManagerResponse.success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
        }
        try {
            HelixHelper.updateIdealState((HelixManager)this.getHelixZkManager(), (String)"brokerResource", idealState -> {
                assert (idealState != null);
                Map instanceStateMap = idealState.getInstanceStateMap(tableNameWithType);
                if (instanceStateMap != null) {
                    instanceStateMap.clear();
                }
                for (String brokerInstance : brokerInstances) {
                    idealState.setPartitionState(tableNameWithType, brokerInstance, "ONLINE");
                }
                return idealState;
            }, (RetryPolicy)DEFAULT_RETRY_POLICY);
            LOGGER.info("Successfully rebuilt brokerResource for table: {}", (Object)tableNameWithType);
            return PinotResourceManagerResponse.success("Rebuilt brokerResource for table: " + tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while rebuilding broker resource for table: {}", (Object)tableNameWithType, (Object)e);
            throw e;
        }
    }

    private void addInstanceToBrokerIdealState(String brokerTenantTag, String instanceName) {
        IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, "brokerResource");
        for (String tableNameWithType : tableIdealState.getPartitionSet()) {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
            Preconditions.checkNotNull((Object)tableConfig);
            String brokerTag = TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig());
            if (!brokerTag.equals(brokerTenantTag)) continue;
            tableIdealState.setPartitionState(tableNameWithType, instanceName, "ONLINE");
        }
        this._helixAdmin.setResourceIdealState(this._helixClusterName, "brokerResource", tableIdealState);
    }

    private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        int numberBrokersToUntag = instancesInClusterWithTag.size() - tenant.getNumberOfInstances();
        for (int i = 0; i < numberBrokersToUntag; ++i) {
            this.retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void retagInstance(String instanceName, String oldTag, String newTag) {
        this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, oldTag);
        this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, newTag);
    }

    public PinotResourceManagerResponse updateServerTenant(Tenant serverTenant) {
        boolean isCurrentTenantColocated;
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        List taggedRealtimeServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeServerTag);
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        List taggedOfflineServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineServerTag);
        HashSet<String> allServingServers = new HashSet<String>();
        allServingServers.addAll(taggedOfflineServers);
        allServingServers.addAll(taggedRealtimeServers);
        boolean bl = isCurrentTenantColocated = allServingServers.size() < taggedOfflineServers.size() + taggedRealtimeServers.size();
        if (isCurrentTenantColocated != serverTenant.isCoLocated()) {
            String message = "Not support different colocated type request for update request: " + serverTenant;
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.getNumberOfInstances() < allServingServers.size() || serverTenant.getOfflineInstances() < taggedOfflineServers.size() || serverTenant.getRealtimeInstances() < taggedRealtimeServers.size()) {
            return this.scaleDownServer(serverTenant, taggedRealtimeServers, taggedOfflineServers, allServingServers);
        }
        return this.scaleUpServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, allServingServers);
    }

    private PinotResourceManagerResponse scaleUpServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, Set<String> allServingServers) {
        int incInstances = serverTenant.getNumberOfInstances() - allServingServers.size();
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedServerInstanceList();
        if (unTaggedInstanceList.size() < incInstances) {
            String message = "Failed to allocate hardware resources with tenant info: " + serverTenant + ", Current number of untagged instances : " + unTaggedInstanceList.size() + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.isCoLocated()) {
            return this.updateColocatedServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
        }
        return this.updateIndependentServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
    }

    private PinotResourceManagerResponse updateIndependentServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        for (i = 0; i < incOffline; ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "server_untagged", realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse updateColocatedServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        taggedRealtimeServers.removeAll(taggedOfflineServers);
        taggedOfflineServers.removeAll(taggedRealtimeServers);
        for (i = 0; i < incOffline; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedRealtimeServers.get(i - incInstances), offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", realtimeServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedOfflineServers.get(i - Math.max(incInstances, incOffline)), realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleDownServer(Tenant serverTenant, List<String> taggedRealtimeServers, List<String> taggedOfflineServers, Set<String> allServingServers) {
        String message = "Not support to size down the current server cluster with tenant info: " + serverTenant + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
        LOGGER.error(message);
        return PinotResourceManagerResponse.failure(message);
    }

    public boolean isBrokerTenantDeletable(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag));
        String brokerName = "brokerResource";
        IdealState brokerIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, brokerName);
        for (String partition : brokerIdealState.getPartitionSet()) {
            for (String instance : brokerIdealState.getInstanceSet(partition)) {
                if (!taggedInstances.contains(instance)) continue;
                return false;
            }
        }
        return true;
    }

    public boolean isServerTenantDeletable(String tenantName) {
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getOfflineTagForTenant((String)tenantName)));
        taggedInstances.addAll(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getRealtimeTagForTenant((String)tenantName)));
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resourceName);
            for (String partition : tableIdealState.getPartitionSet()) {
                for (String instance : tableIdealState.getInstanceSet(partition)) {
                    if (!taggedInstances.contains(instance)) continue;
                    return false;
                }
            }
        }
        return true;
    }

    public Set<String> getAllBrokerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isBrokerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    public Set<String> getAllServerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isServerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    private List<String> getTagsForInstance(String instanceName) {
        InstanceConfig config = (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceName));
        return config.getTags();
    }

    public PinotResourceManagerResponse createServerTenant(Tenant serverTenant) {
        int numberOfInstances = serverTenant.getNumberOfInstances();
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedServerInstanceList();
        if (unTaggedInstanceList.size() < numberOfInstances) {
            String message = "Failed to allocate server instances to Tag : " + serverTenant.getTenantName() + ", Current number of untagged server instances : " + unTaggedInstanceList.size() + ", Request asked number is : " + serverTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.isCoLocated()) {
            this.assignColocatedServerTenant(serverTenant, numberOfInstances, unTaggedInstanceList);
        } else {
            this.assignIndependentServerTenant(serverTenant, numberOfInstances, unTaggedInstanceList);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void assignIndependentServerTenant(Tenant serverTenant, int numberOfInstances, List<String> unTaggedInstanceList) {
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getOfflineInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
        }
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getRealtimeInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i + serverTenant.getOfflineInstances()), "server_untagged", realtimeServerTag);
        }
    }

    private void assignColocatedServerTenant(Tenant serverTenant, int numberOfInstances, List<String> unTaggedInstanceList) {
        int cnt = 0;
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getOfflineInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(cnt++), "server_untagged", offlineServerTag);
        }
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getRealtimeInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(cnt++), "server_untagged", realtimeServerTag);
            if (cnt != numberOfInstances) continue;
            cnt = 0;
        }
    }

    public PinotResourceManagerResponse createBrokerTenant(Tenant brokerTenant) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstances = brokerTenant.getNumberOfInstances();
        if (unTaggedInstanceList.size() < numberOfInstances) {
            String message = "Failed to allocate broker instances to Tag : " + brokerTenant.getTenantName() + ", Current number of untagged server instances : " + unTaggedInstanceList.size() + ", Request asked number is : " + brokerTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)brokerTenant.getTenantName());
        for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "broker_untagged", brokerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteOfflineServerTenantFor(String tenantName) {
        String offlineTenantTag = TagNameUtils.getOfflineTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, offlineTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteRealtimeServerTenantFor(String tenantName) {
        String realtimeTenantTag = TagNameUtils.getRealtimeTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, realtimeTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteBrokerTenantFor(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag);
        for (String instance : instancesInClusterWithTag) {
            this.retagInstance(instance, brokerTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public Set<String> getAllInstancesForServerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getServerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForServerTenant(String tenantName) {
        return this.getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public Set<String> getAllInstancesForBrokerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getBrokerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForBrokerTenant(String tenantName) {
        return this.getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public void addSchema(Schema schema, boolean override) {
        ZNRecord record = SchemaUtils.toZNRecord((Schema)schema);
        String schemaName = schema.getSchemaName();
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema != null && !override) {
            throw new RuntimeException(String.format("Schema %s exists. Not overriding it as requested.", schemaName));
        }
        if (schema.equals((Object)oldSchema)) {
            LOGGER.info("New schema is the same with the existing schema. Not updating schema " + schemaName);
            return;
        }
        PinotHelixPropertyStoreZnRecordProvider propertyStoreHelper = PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore);
        propertyStoreHelper.set(schemaName, record);
    }

    public void updateSchema(Schema schema, boolean reload) throws TableNotFoundException, SchemaNotFoundException {
        ZNRecord record = SchemaUtils.toZNRecord((Schema)schema);
        String schemaName = schema.getSchemaName();
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema == null) {
            throw new SchemaNotFoundException(String.format("Schema %s did not exist.", schemaName));
        }
        if (schema.equals((Object)oldSchema)) {
            LOGGER.info("New schema is the same with the existing schema. Not updating schema " + schemaName);
            return;
        }
        PinotHelixPropertyStoreZnRecordProvider propertyStoreHelper = PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore);
        propertyStoreHelper.set(schemaName, record);
        boolean isNewSchemaBackwardCompatible = schema.isBackwardCompatibleWith(oldSchema);
        if (!isNewSchemaBackwardCompatible) {
            LOGGER.warn(String.format("New schema %s is not backward compatible", schemaName));
        } else if (reload) {
            LOGGER.info("Reloading tables with name: {}", (Object)schemaName);
            List<String> tableNamesWithType = this.getExistingTableNamesWithType(schemaName, null);
            for (String tableNameWithType : tableNamesWithType) {
                this.reloadAllSegments(tableNameWithType);
            }
        }
    }

    public boolean deleteSchema(Schema schema) {
        String propertyStorePath;
        if (schema != null && this._propertyStore.exists(propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForSchema((String)schema.getSchemaName()), AccessOption.PERSISTENT)) {
            this._propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
            return true;
        }
        return false;
    }

    @Nullable
    public Schema getSchema(String schemaName) {
        return ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
    }

    @Nullable
    public Schema getTableSchema(String tableName) {
        return ZKMetadataProvider.getTableSchema(this._propertyStore, (String)tableName);
    }

    public List<String> getSchemaNames() {
        return this._propertyStore.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore).getRelativePath(), AccessOption.PERSISTENT);
    }

    public void addTable(TableConfig tableConfig) throws IOException {
        if (this.isSingleTenantCluster()) {
            tableConfig.setTenantConfig(new TenantConfig("DefaultTenant", "DefaultTenant", null));
        }
        this.validateTableTenantConfig(tableConfig);
        String tableNameWithType = tableConfig.getTableName();
        SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
        CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
        switch (tableType) {
            case OFFLINE: {
                if (this.getAllTables().contains(tableNameWithType)) {
                    throw new TableAlreadyExistsException("Table " + tableNameWithType + " already exists");
                }
                LOGGER.info("building empty ideal state for table : " + tableNameWithType);
                IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, Integer.parseInt(segmentsConfig.getReplication()), this._enableBatchMessageMode);
                LOGGER.info("adding table via the admin");
                this._helixAdmin.addResource(this._helixClusterName, tableNameWithType, offlineIdealState);
                ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)tableConfig.toZNRecord());
                this.assignInstances(tableConfig, true);
                LOGGER.info("Successfully added table: {}", (Object)tableNameWithType);
                break;
            }
            case REALTIME: {
                String schemaName;
                IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
                this.verifyIndexingConfig(tableNameWithType, indexingConfig);
                Schema schema = ZKMetadataProvider.getSchema(this._propertyStore, (String)TableNameBuilder.extractRawTableName((String)tableNameWithType));
                if (schema == null && ((schemaName = tableConfig.getValidationConfig().getSchemaName()) == null || ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName) == null)) {
                    throw new InvalidTableConfigException("No schema defined for realtime table: " + tableNameWithType);
                }
                ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)tableConfig.toZNRecord());
                this.assignInstances(tableConfig, true);
                this.ensureRealtimeClusterIsSetUp(tableConfig);
                LOGGER.info("Successfully added or updated the table {} ", (Object)tableNameWithType);
                break;
            }
            default: {
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
            }
        }
        LOGGER.info("Updating BrokerResource IdealState for table: {}", (Object)tableNameWithType);
        String brokerTag = TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig());
        List brokers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag);
        HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)"brokerResource", idealState -> {
            assert (idealState != null);
            idealState.getRecord().getMapFields().put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, "ONLINE"));
            return idealState;
        });
    }

    @VisibleForTesting
    void validateTableTenantConfig(TableConfig tableConfig) {
        String tableNameWithType = tableConfig.getTableName();
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        TreeSet<String> tagsToCheck = new TreeSet<String>();
        tagsToCheck.add(TagNameUtils.extractBrokerTag((TenantConfig)tenantConfig));
        if (tableConfig.getTableType() == CommonConstants.Helix.TableType.OFFLINE) {
            tagsToCheck.add(TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig));
        } else {
            String consumingServerTag = TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)consumingServerTag)) {
                throw new InvalidTableConfigException("Invalid CONSUMING server tag: " + consumingServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(consumingServerTag);
            String completedServerTag = TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)completedServerTag)) {
                throw new InvalidTableConfigException("Invalid COMPLETED server tag: " + completedServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(completedServerTag);
        }
        for (String tag : tagsToCheck) {
            if (!this.getInstancesWithTag(tag).isEmpty()) continue;
            throw new InvalidTableConfigException("Failed to find instances with tag: " + tag + " for table: " + tableNameWithType);
        }
    }

    public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
        this._pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
    }

    private void verifyIndexingConfig(String tableNameWithType, IndexingConfig indexingConfig) {
        StreamConfig streamConfig = new StreamConfig(tableNameWithType, indexingConfig.getStreamConfigs());
        if (streamConfig.hasHighLevelConsumerType() && !this._allowHLCTables) {
            throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
        }
    }

    private void ensureRealtimeClusterIsSetUp(TableConfig realtimeTableConfig) {
        String realtimeTableName = realtimeTableConfig.getTableName();
        StreamConfig streamConfig = new StreamConfig(realtimeTableConfig.getTableName(), realtimeTableConfig.getIndexingConfig().getStreamConfigs());
        IdealState idealState = this.getTableIdealState(realtimeTableName);
        if (streamConfig.hasHighLevelConsumerType()) {
            if (idealState == null) {
                LOGGER.info("Initializing IdealState for HLC table: {}", (Object)realtimeTableName);
                idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, this._helixZkManager, this._propertyStore, this._enableBatchMessageMode);
                this._helixAdmin.addResource(this._helixClusterName, realtimeTableName, idealState);
            } else if (!streamConfig.hasLowLevelConsumerType()) {
                this._pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
            }
            this.ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
        }
        if (streamConfig.hasLowLevelConsumerType()) {
            if (ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, (String)realtimeTableName).isEmpty()) {
                PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(this._pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig, idealState, this._enableBatchMessageMode);
                LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", (Object)realtimeTableName);
            } else {
                LOGGER.info("LLC is already set up for table {}, not configuring again", (Object)realtimeTableName);
            }
        }
    }

    private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
        String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource((String)realtimeTableName);
        if (!this._propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
            LOGGER.info("Creating property store entry for HLC table: {}", (Object)realtimeTableName);
            this._propertyStore.create(propertyStorePath, (Object)new ZNRecord(realtimeTableName), AccessOption.PERSISTENT);
        }
    }

    private void assignInstances(TableConfig tableConfig, boolean override) {
        String tableNameWithType = tableConfig.getTableName();
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableNameWithType);
        ArrayList<InstancePartitionsType> instancePartitionsTypesToAssign = new ArrayList<InstancePartitionsType>();
        for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
            if (!InstanceAssignmentConfigUtils.allowInstanceAssignment((TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType) || !override && InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, (String)instancePartitionsType.getInstancePartitionsName(rawTableName)) != null) continue;
            instancePartitionsTypesToAssign.add(instancePartitionsType);
        }
        if (!instancePartitionsTypesToAssign.isEmpty()) {
            LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, (Object)tableNameWithType);
            InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
            List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
            for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
                InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
                LOGGER.info("Persisting instance partitions: {}", (Object)instancePartitions);
                InstancePartitionsUtils.persistInstancePartitions(this._propertyStore, (InstancePartitions)instancePartitions);
            }
        }
    }

    public void updateTableConfig(TableConfig tableConfig) throws IOException {
        this.validateTableTenantConfig(tableConfig);
        this.setExistingTableConfig(tableConfig);
    }

    public void setExistingTableConfig(TableConfig tableConfig) throws IOException {
        String tableNameWithType = tableConfig.getTableName();
        SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
        CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
        switch (tableType) {
            case OFFLINE: {
                ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)tableConfig.toZNRecord());
                IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
                String replicationConfigured = segmentsConfig.getReplication();
                if (!idealState.getReplicas().equals(replicationConfigured)) {
                    HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, is -> {
                        assert (is != null);
                        is.setReplicas(replicationConfigured);
                        return is;
                    }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)1.2f));
                }
                this.assignInstances(tableConfig, false);
                break;
            }
            case REALTIME: {
                IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
                this.verifyIndexingConfig(tableNameWithType, indexingConfig);
                ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)tableConfig.toZNRecord());
                this.assignInstances(tableConfig, false);
                this.ensureRealtimeClusterIsSetUp(tableConfig);
                break;
            }
            default: {
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
            }
        }
        this.sendUpdateQueryQuotaMessage(tableConfig);
    }

    public void updateMetadataConfigFor(String tableName, CommonConstants.Helix.TableType type, TableCustomConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((CommonConstants.Helix.TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setCustomConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void updateSegmentsValidationAndRetentionConfigFor(String tableName, CommonConstants.Helix.TableType type, SegmentsValidationAndRetentionConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((CommonConstants.Helix.TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setValidationConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void updateIndexingConfigFor(String tableName, CommonConstants.Helix.TableType type, IndexingConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((CommonConstants.Helix.TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setIndexingConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void deleteOfflineTable(String tableName) {
        String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
        LOGGER.info("Deleting table {}: Start", (Object)offlineTableName);
        HelixHelper.removeResourceFromBrokerIdealState((HelixManager)this._helixZkManager, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed from broker resource", (Object)offlineTableName);
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(offlineTableName)) {
            this._helixAdmin.dropResource(this._helixClusterName, offlineTableName);
            LOGGER.info("Deleting table {}: Removed helix table resource", (Object)offlineTableName);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(offlineTableName, this.getSegmentsFor(offlineTableName));
        LOGGER.info("Deleting table {}: Removed stored segments", (Object)offlineTableName);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed segment metadata", (Object)offlineTableName);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed table config", (Object)offlineTableName);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.OFFLINE.getInstancePartitionsName(TableNameBuilder.extractRawTableName((String)tableName)));
        LOGGER.info("Deleting table {}: Removed instance partitions", (Object)offlineTableName);
        LOGGER.info("Deleting table {}: Finish", (Object)offlineTableName);
    }

    public void deleteRealtimeTable(String tableName) {
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        LOGGER.info("Deleting table {}: Start", (Object)realtimeTableName);
        HelixHelper.removeResourceFromBrokerIdealState((HelixManager)this._helixZkManager, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed from broker resource", (Object)realtimeTableName);
        Set<String> instancesForTable = null;
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(realtimeTableName)) {
            instancesForTable = this.getAllInstancesForTable(realtimeTableName);
            this._helixAdmin.dropResource(this._helixClusterName, realtimeTableName);
            LOGGER.info("Deleting table {}: Removed helix table resource", (Object)realtimeTableName);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, this.getSegmentsFor(realtimeTableName));
        LOGGER.info("Deleting table {}: Removed stored segments", (Object)realtimeTableName);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed segment metadata", (Object)realtimeTableName);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed table config", (Object)realtimeTableName);
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableName);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
        LOGGER.info("Deleting table {}: Removed instance partitions", (Object)realtimeTableName);
        if (instancesForTable != null) {
            for (String instance : instancesForTable) {
                InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, (String)instance);
                if (instanceZKMetadata == null) continue;
                instanceZKMetadata.removeResource(realtimeTableName);
                ZKMetadataProvider.setInstanceZKMetadata(this._propertyStore, (InstanceZKMetadata)instanceZKMetadata);
            }
        }
        LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for HLC table", (Object)realtimeTableName);
        LOGGER.info("Deleting table {}: Finish", (Object)realtimeTableName);
    }

    public PinotResourceManagerResponse toggleTableState(String tableNameWithType, StateType stateType) {
        if (!this.hasTable(tableNameWithType)) {
            return PinotResourceManagerResponse.failure("Table: " + tableNameWithType + " not found");
        }
        switch (stateType) {
            case ENABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, true);
                boolean resetSuccessful = false;
                try {
                    this._helixAdmin.resetResource(this._helixClusterName, Collections.singletonList(tableNameWithType));
                    resetSuccessful = true;
                }
                catch (HelixException e) {
                    LOGGER.warn("Caught exception while resetting resource: {}", (Object)tableNameWithType, (Object)e);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " enabled (reset success = " + resetSuccessful + ")");
            }
            case DISABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, false);
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " disabled");
            }
            case DROP: {
                CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
                if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
                    this.deleteOfflineTable(tableNameWithType);
                } else {
                    this.deleteRealtimeTable(tableNameWithType);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " dropped");
            }
        }
        throw new IllegalStateException();
    }

    private Set<String> getAllInstancesForTable(String tableNameWithType) {
        HashSet<String> instanceSet = new HashSet<String>();
        IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        for (String partition : tableIdealState.getPartitionSet()) {
            instanceSet.addAll(tableIdealState.getInstanceSet(partition));
        }
        return instanceSet;
    }

    public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, String downloadUrl) {
        this.addNewSegment(tableName, segmentMetadata, downloadUrl, null);
    }

    public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter) {
        String segmentName = segmentMetadata.getName();
        String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
        OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
        ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata);
        offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
        offlineSegmentZKMetadata.setCrypterName(crypter);
        offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
        String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)offlineTableName, (String)segmentName);
        Preconditions.checkState((boolean)this._propertyStore.set(segmentZKMetadataPath, (Object)offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT), (Object)("Failed to set segment ZK metadata for table: " + offlineTableName + ", segment: " + segmentName));
        LOGGER.info("Added segment: {} of table: {} to property store", (Object)segmentName, (Object)offlineTableName);
        try {
            TableConfig offlineTableConfig = this.getTableConfig(offlineTableName);
            Preconditions.checkState((offlineTableConfig != null ? 1 : 0) != 0, (Object)("Failed to find table config for table: " + offlineTableName));
            SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixZkManager, offlineTableConfig);
            Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixZkManager, (TableConfig)offlineTableConfig, (InstancePartitionsType)InstancePartitionsType.OFFLINE));
            HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)offlineTableName, idealState -> {
                assert (idealState != null);
                Map currentAssignment = idealState.getRecord().getMapFields();
                if (currentAssignment.containsKey(segmentName)) {
                    LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", (Object)segmentName, (Object)offlineTableName);
                } else {
                    List<String> assignedInstances = segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
                    LOGGER.info("Assigning segment: {} to instances: {} for table: {}", new Object[]{segmentName, assignedInstances, offlineTableName});
                    currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, "ONLINE"));
                }
                return idealState;
            });
            LOGGER.info("Added segment: {} to IdealState for table: {}", (Object)segmentName, (Object)offlineTableName);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", new Object[]{segmentName, offlineTableName, e});
            if (this._propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) {
                LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)offlineTableName);
            } else {
                LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)offlineTableName);
            }
            throw e;
        }
    }

    @Nullable
    public ZNRecord getSegmentMetadataZnRecord(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.getZnRecord(this._propertyStore, (String)ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName));
    }

    public boolean updateZkMetadata(String offlineTableName, OfflineSegmentZKMetadata segmentMetadata, int expectedVersion) {
        return ZKMetadataProvider.setOfflineSegmentZKMetadata(this._propertyStore, (String)offlineTableName, (OfflineSegmentZKMetadata)segmentMetadata, (int)expectedVersion);
    }

    public boolean updateZkMetadata(String offlineTableName, OfflineSegmentZKMetadata segmentMetadata) {
        return ZKMetadataProvider.setOfflineSegmentZKMetadata(this._propertyStore, (String)offlineTableName, (OfflineSegmentZKMetadata)segmentMetadata);
    }

    public void refreshSegment(String offlineTableName, SegmentMetadata segmentMetadata, OfflineSegmentZKMetadata offlineSegmentZKMetadata, String downloadUrl, @Nullable String crypter) {
        String segmentName = segmentMetadata.getName();
        ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, segmentMetadata);
        offlineSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
        offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
        offlineSegmentZKMetadata.setCrypterName(crypter);
        if (!ZKMetadataProvider.setOfflineSegmentZKMetadata(this._propertyStore, (String)offlineTableName, (OfflineSegmentZKMetadata)offlineSegmentZKMetadata)) {
            throw new RuntimeException("Failed to update ZK metadata for segment: " + segmentName + " of table: " + offlineTableName);
        }
        LOGGER.info("Updated segment: {} of table: {} to property store", (Object)segmentName, (Object)offlineTableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)offlineTableName);
        Preconditions.checkNotNull((Object)tableConfig);
        if (this.shouldSendMessage(tableConfig)) {
            this.sendSegmentRefreshMessage(offlineTableName, segmentName);
        } else if (!this.updateExistedSegment(offlineTableName, segmentName)) {
            LOGGER.error("Failed to refresh segment: {} of table: {} by the ONLINE->OFFLINE->ONLINE state transition", (Object)segmentName, (Object)offlineTableName);
        }
    }

    public int reloadAllSegments(String tableNameWithType) {
        LOGGER.info("Sending reload message for table: {}", (Object)tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, null);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No reload message sent for table: {}", (Object)tableNameWithType);
        }
        return numMessagesSent;
    }

    public int reloadSegment(String tableNameWithType, String segmentName) {
        LOGGER.info("Sending reload message for segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setPartition(segmentName);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segmentName);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for segment: {} in table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
        } else {
            LOGGER.warn("No reload message sent for segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
        }
        return numMessagesSent;
    }

    private boolean shouldSendMessage(TableConfig tableConfig) {
        Map customConfigMap;
        TableCustomConfig customConfig = tableConfig.getCustomConfig();
        if (customConfig != null && (customConfigMap = customConfig.getCustomConfigs()) != null) {
            return !customConfigMap.containsKey("messageBasedRefresh") || Boolean.valueOf((String)customConfigMap.get("messageBasedRefresh")) != false;
        }
        return true;
    }

    private void sendSegmentRefreshMessage(String offlineTableName, String segmentName) {
        SegmentRefreshMessage refreshMessage = new SegmentRefreshMessage(offlineTableName, segmentName);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(offlineTableName);
        recipientCriteria.setPartition(segmentName);
        recipientCriteria.setSessionSpecific(true);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)refreshMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} refresh messages to servers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, offlineTableName});
        } else {
            LOGGER.warn("No refresh message sent to servers for segment: {} of table: {}", (Object)segmentName, (Object)offlineTableName);
        }
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setPartition(offlineTableName);
        numMessagesSent = messagingService.send(recipientCriteria, (Message)refreshMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} refresh messages to brokers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, offlineTableName});
        } else {
            LOGGER.warn("No refresh message sent to brokers for segment: {} of table: {}", (Object)segmentName, (Object)offlineTableName);
        }
    }

    private void sendUpdateQueryQuotaMessage(TableConfig tableConfig) {
        String tableNameWithType = tableConfig.getTableName();
        QueryQuotaUpdateMessage refreshMessage = new QueryQuotaUpdateMessage(tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setPartition(tableNameWithType);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        LOGGER.info("Sending query quota update message for table {}:{} to recipients {}", new Object[]{tableNameWithType, refreshMessage, recipientCriteria});
        int nMsgsSent = messagingService.send(recipientCriteria, (Message)refreshMessage, null, -1);
        if (nMsgsSent > 0) {
            LOGGER.info("Sent {} query quota update msgs for table {}", (Object)nMsgsSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("Unable to send query quota update message for table {}, nMsgs={}", (Object)tableNameWithType, (Object)nMsgsSent);
        }
    }

    private boolean updateExistedSegment(String tableNameWithType, String segmentName) {
        IdealState idealState;
        IdealState idealState2;
        boolean updateSuccessful;
        HelixDataAccessor helixDataAccessor = this._helixZkManager.getHelixDataAccessor();
        PropertyKey idealStatePropertyKey = this._keyBuilder.idealStates(tableNameWithType);
        do {
            Set instanceSet;
            if ((instanceSet = (idealState2 = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType)).getInstanceSet(segmentName)) == null || instanceSet.size() == 0) {
                LOGGER.warn("No instances as yet for segment {}, table {}", (Object)segmentName, (Object)tableNameWithType);
                return true;
            }
            for (String instance : instanceSet) {
                idealState2.setPartitionState(segmentName, instance, "OFFLINE");
            }
        } while (!(updateSuccessful = helixDataAccessor.updateProperty(idealStatePropertyKey, (HelixProperty)idealState2)));
        IdealState updatedIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        Map instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
        for (String state : instanceStateMap.values()) {
            if ("OFFLINE".equals(state)) continue;
            LOGGER.error("Failed to write OFFLINE ideal state!");
            return false;
        }
        LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView");
        if (!this.ifExternalViewChangeReflectedForState(tableNameWithType, segmentName, "OFFLINE", this._externalViewOnlineToOfflineTimeoutMillis, false)) {
            LOGGER.error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit", (Object)segmentName, (Object)this._externalViewOnlineToOfflineTimeoutMillis);
            return false;
        }
        do {
            idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
            Set instanceSet = idealState.getInstanceSet(segmentName);
            LOGGER.info("Found {} instances for segment '{}', in ideal state", (Object)instanceSet.size(), (Object)segmentName);
            for (String instance : instanceSet) {
                idealState.setPartitionState(segmentName, instance, "ONLINE");
                LOGGER.info("Setting Ideal State for segment '{}' to ONLINE for instance '{}'", (Object)segmentName, (Object)instance);
            }
        } while (!(updateSuccessful = helixDataAccessor.updateProperty(idealStatePropertyKey, (HelixProperty)idealState)));
        updatedIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
        LOGGER.info("Found {} instances for segment '{}', after updating ideal state", (Object)instanceStateMap.size(), (Object)segmentName);
        for (String state : instanceStateMap.values()) {
            if ("ONLINE".equals(state)) continue;
            LOGGER.error("Failed to write ONLINE ideal state!");
            return false;
        }
        LOGGER.info("Refresh is done for segment - " + segmentName);
        return true;
    }

    public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) {
        TreeMap<String, List<String>> serverToSegmentsMap = new TreeMap<String, List<String>>();
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        for (String segment : idealState.getPartitionSet()) {
            for (String server : idealState.getInstanceStateMap(segment).keySet()) {
                serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList()).add(segment);
            }
        }
        return serverToSegmentsMap;
    }

    public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
        IdealState is = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        ArrayList segmentList = new ArrayList(is.getPartitionSet());
        ArrayList<String> segmentMetadataPaths = new ArrayList<String>(segmentList.size());
        for (String segmentName : segmentList) {
            segmentMetadataPaths.add(this.buildPathForSegmentMetadata(tableNameWithType, segmentName));
        }
        if (!this._segmentCrcMap.containsKey(tableNameWithType)) {
            this._lastKnownSegmentMetadataVersionMap.put(tableNameWithType, new HashMap());
            this._segmentCrcMap.put(tableNameWithType, new HashMap());
        }
        Stat[] metadataStats = this._propertyStore.getStats(segmentMetadataPaths, AccessOption.PERSISTENT);
        for (int i = 0; i < metadataStats.length; ++i) {
            String currentSegment = (String)segmentList.get(i);
            Stat metadataStat = metadataStats[i];
            if (metadataStat == null) continue;
            int currentVersion = metadataStat.getVersion();
            if (this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).containsKey(currentSegment)) {
                int lastKnownVersion = this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).get(currentSegment);
                if (lastKnownVersion == currentVersion) continue;
                this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
                continue;
            }
            this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
        }
        Set segmentsSet = is.getPartitionSet();
        Iterator<Map.Entry<String, Long>> iter = this._segmentCrcMap.get(tableNameWithType).entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            String segmentName = entry.getKey();
            if (segmentsSet.contains(segmentName)) continue;
            iter.remove();
            this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).remove(segmentName);
        }
        TreeMap<String, String> resultCrcMap = new TreeMap<String, String>();
        for (String segment : segmentList) {
            resultCrcMap.put(segment, String.valueOf(this._segmentCrcMap.get(tableNameWithType).get(segment)));
        }
        return resultCrcMap;
    }

    private void updateSegmentMetadataCrc(String tableNameWithType, String segmentName, int currentVersion) {
        OfflineSegmentZKMetadata offlineSegmentZKMetadata = ZKMetadataProvider.getOfflineSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
        assert (offlineSegmentZKMetadata != null);
        this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).put(segmentName, currentVersion);
        this._segmentCrcMap.get(tableNameWithType).put(segmentName, offlineSegmentZKMetadata.getCrc());
    }

    public String buildPathForSegmentMetadata(String tableNameWithType, String segmentName) {
        return "/SEGMENTS/" + tableNameWithType + "/" + segmentName;
    }

    public boolean hasTable(String tableNameWithType) {
        return this.getAllResources().contains(tableNameWithType);
    }

    public boolean hasOfflineTable(String tableName) {
        return this.hasTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
    }

    public boolean hasRealtimeTable(String tableName) {
        return this.hasTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
    }

    @Nullable
    public IdealState getTableIdealState(String tableNameWithType) {
        return this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public ExternalView getTableExternalView(String tableNameWithType) {
        return this._helixAdmin.getResourceExternalView(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public TableConfig getTableConfig(String tableNameWithType) {
        return ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
    }

    @Nullable
    public TableConfig getOfflineTableConfig(String tableName) {
        return ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getRealtimeTableConfig(String tableName) {
        return ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getTableConfig(String tableName, CommonConstants.Helix.TableType tableType) {
        if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
            return this.getOfflineTableConfig(tableName);
        }
        return this.getRealtimeTableConfig(tableName);
    }

    public List<String> getServerInstancesForTable(String tableName, CommonConstants.Helix.TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        HashSet serverInstances = new HashSet();
        List instanceConfigs = HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
        if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig)));
        } else if (CommonConstants.Helix.TableType.REALTIME.equals((Object)tableType)) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig)));
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig)));
        }
        return new ArrayList<String>(serverInstances);
    }

    public List<String> getBrokerInstancesForTable(String tableName, CommonConstants.Helix.TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig()));
    }

    public PinotResourceManagerResponse enableInstance(String instanceName) {
        return this.enableInstance(instanceName, true, 10000L);
    }

    public PinotResourceManagerResponse disableInstance(String instanceName) {
        return this.enableInstance(instanceName, false, 10000L);
    }

    public PinotResourceManagerResponse dropInstance(String instanceName) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(instanceName)) != null) {
            return PinotResourceManagerResponse.failure("Instance " + instanceName + " is still live");
        }
        for (String resource : this.getAllResources()) {
            IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resource);
            for (String partition : idealState.getPartitionSet()) {
                if (!idealState.getInstanceSet(partition).contains(instanceName)) continue;
                return PinotResourceManagerResponse.failure("Instance " + instanceName + " exists in ideal state for " + resource);
            }
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instance(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /INSTANCES/" + instanceName);
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instanceConfig(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /CONFIGS/PARTICIPANT/" + instanceName + ". Make sure to remove /CONFIGS/PARTICIPANT/" + instanceName + " manually since /INSTANCES/" + instanceName + " has already been removed");
        }
        return PinotResourceManagerResponse.success("Instance " + instanceName + " dropped");
    }

    private PinotResourceManagerResponse enableInstance(String instanceName, boolean enableInstance, long timeOutMs) {
        if (!this.instanceExists(instanceName)) {
            return PinotResourceManagerResponse.failure("Instance " + instanceName + " not found");
        }
        this._helixAdmin.enableInstance(this._helixClusterName, instanceName, enableInstance);
        long intervalWaitTimeMs = 500L;
        long deadline = System.currentTimeMillis() + timeOutMs;
        String offlineState = "OFFLINE";
        while (System.currentTimeMillis() < deadline) {
            PropertyKey liveInstanceKey = this._keyBuilder.liveInstance(instanceName);
            LiveInstance liveInstance = (LiveInstance)this._helixDataAccessor.getProperty(liveInstanceKey);
            if (liveInstance == null) {
                if (!enableInstance) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
            } else {
                boolean toggleSucceeded = true;
                PropertyKey instanceCurrentStatesKey = this._keyBuilder.currentStates(instanceName, liveInstance.getSessionId());
                List instanceCurrentStates = this._helixDataAccessor.getChildValues(instanceCurrentStatesKey);
                if (instanceCurrentStates.isEmpty()) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
                for (CurrentState currentState : instanceCurrentStates) {
                    for (String state : currentState.getPartitionStateMap().values()) {
                        if ((!enableInstance || offlineState.equals(state)) && (enableInstance || !offlineState.equals(state))) continue;
                        toggleSucceeded = false;
                        break;
                    }
                    if (toggleSucceeded) continue;
                    break;
                }
                if (toggleSucceeded) {
                    return enableInstance ? PinotResourceManagerResponse.success("Instance " + instanceName + " enabled") : PinotResourceManagerResponse.success("Instance " + instanceName + " disabled");
                }
            }
            try {
                Thread.sleep(intervalWaitTimeMs);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got interrupted when sleeping for {}ms to wait until the current state matched for instance: {}", (Object)intervalWaitTimeMs, (Object)instanceName);
                return PinotResourceManagerResponse.failure("Got interrupted when waiting for instance to be " + (enableInstance ? "enabled" : "disabled"));
            }
        }
        return PinotResourceManagerResponse.failure("Instance " + (enableInstance ? "enable" : "disable") + " failed, timeout");
    }

    public RebalanceResult rebalanceTable(String tableNameWithType, Configuration rebalanceConfig) throws TableNotFoundException {
        TableConfig tableConfig = this.getTableConfig(tableNameWithType);
        if (tableConfig == null) {
            throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType);
        }
        return new TableRebalancer(this._helixZkManager).rebalance(tableConfig, rebalanceConfig);
    }

    public boolean instanceExists(String instanceName) {
        return this.getHelixInstanceConfig(instanceName) != null;
    }

    public boolean isSingleTenantCluster() {
        return this._isSingleTenantCluster;
    }

    public List<String> getOnlineUnTaggedBrokerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"broker_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineUnTaggedServerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"server_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineInstanceList() {
        return this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
    }

    public BiMap<String, String> getDataInstanceAdminEndpoints(Set<String> instances) throws InvalidConfigException {
        HashBiMap endpointToInstance = HashBiMap.create((int)instances.size());
        for (String instance : instances) {
            String instanceAdminEndpoint;
            try {
                instanceAdminEndpoint = (String)this._instanceAdminEndpointCache.get((Object)instance);
            }
            catch (ExecutionException e) {
                String errorMessage = String.format("ExecutionException when getting instance admin endpoint for instance: %s. Error message: %s", instance, e.getMessage());
                LOGGER.error(errorMessage, (Throwable)e);
                throw new InvalidConfigException(errorMessage);
            }
            endpointToInstance.put((Object)instance, (Object)instanceAdminEndpoint);
        }
        return endpointToInstance;
    }

    public List<String> getExistingTableNamesWithType(String tableName, @Nullable CommonConstants.Helix.TableType tableType) throws TableNotFoundException {
        ArrayList<String> tableNamesWithType = new ArrayList<String>(2);
        CommonConstants.Helix.TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName((String)tableName);
        if (tableTypeFromTableName != null) {
            if (tableType != null && tableType != tableTypeFromTableName) {
                throw new IllegalArgumentException("Table name: " + tableName + " does not match table type: " + tableType);
            }
            if (this.getTableConfig(tableName) != null) {
                tableNamesWithType.add(tableName);
            }
        } else {
            String realtimeTableName;
            String offlineTableName;
            if ((tableType == null || tableType == CommonConstants.Helix.TableType.OFFLINE) && this.getTableConfig(offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(offlineTableName);
            }
            if ((tableType == null || tableType == CommonConstants.Helix.TableType.REALTIME) && this.getTableConfig(realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(realtimeTableName);
            }
        }
        if (tableNamesWithType.isEmpty()) {
            throw new TableNotFoundException(tableNamesWithType + " not found.");
        }
        return tableNamesWithType;
    }

    public static class TableAlreadyExistsException
    extends RuntimeException {
        public TableAlreadyExistsException(String message) {
            super(message);
        }

        public TableAlreadyExistsException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    public static class InvalidTableConfigException
    extends RuntimeException {
        public InvalidTableConfigException(String message) {
            super(message);
        }

        public InvalidTableConfigException(String message, Throwable cause) {
            super(message, cause);
        }

        public InvalidTableConfigException(Throwable cause) {
            super(cause);
        }
    }
}

