/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.messages.TableDeletionMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.BcryptUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UserAlreadyExistsException;
import org.apache.pinot.controller.api.resources.InstanceInfo;
import org.apache.pinot.controller.api.resources.OperationValidationResponse;
import org.apache.pinot.controller.api.resources.PeriodicTaskInvocationResponse;
import org.apache.pinot.controller.api.resources.StateType;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.user.ComponentType;
import org.apache.pinot.spi.config.user.RoleType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotHelixResourceManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
    private static final long CACHE_ENTRY_EXPIRE_TIME_HOURS = 6L;
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)2.0);
    private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 10;
    public static final String APPEND = "APPEND";
    private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
    private static final String API_REQUEST_ID_PREFIX = "api-";
    public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 600000L;
    public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1000L;
    public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 1200000L;
    public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1000L;
    private static final DateTimeFormatter SIMPLE_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
    private final Map<String, Map<String, Long>> _segmentCrcMap = new HashMap<String, Map<String, Long>>();
    private final Map<String, Map<String, Integer>> _lastKnownSegmentMetadataVersionMap = new HashMap<String, Map<String, Integer>>();
    private final Object[] _tableUpdaterLocks;
    private final LoadingCache<String, String> _instanceAdminEndpointCache;
    private final String _helixZkURL;
    private final String _helixClusterName;
    private final String _dataDir;
    private final boolean _isSingleTenantCluster;
    private final boolean _enableBatchMessageMode;
    private final int _deletedSegmentsRetentionInDays;
    private final boolean _enableTieredSegmentAssignment;
    private HelixManager _helixZkManager;
    private HelixAdmin _helixAdmin;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private HelixDataAccessor _helixDataAccessor;
    private PropertyKey.Builder _keyBuilder;
    private ControllerMetrics _controllerMetrics;
    private SegmentDeletionManager _segmentDeletionManager;
    private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
    private TableCache _tableCache;
    private final LineageManager _lineageManager;

    public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
        this._helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
        this._helixClusterName = helixClusterName;
        this._dataDir = dataDir;
        this._isSingleTenantCluster = isSingleTenantCluster;
        this._enableBatchMessageMode = enableBatchMessageMode;
        this._deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
        this._enableTieredSegmentAssignment = enableTieredSegmentAssignment;
        this._instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(6L, TimeUnit.HOURS).build((CacheLoader)new CacheLoader<String, String>(){

            public String load(String instanceId) {
                InstanceConfig instanceConfig = PinotHelixResourceManager.this.getHelixInstanceConfig(instanceId);
                Preconditions.checkNotNull((Object)instanceConfig, (String)"Failed to find instance config for: %s", (Object)instanceId);
                return InstanceUtils.getServerAdminEndpoint((InstanceConfig)instanceConfig);
            }
        });
        this._tableUpdaterLocks = new Object[100];
        for (int i = 0; i < this._tableUpdaterLocks.length; ++i) {
            this._tableUpdaterLocks[i] = new Object();
        }
        this._lineageManager = lineageManager;
    }

    public PinotHelixResourceManager(ControllerConf controllerConf) {
        this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), LineageManagerFactory.create(controllerConf));
    }

    public synchronized void start(HelixManager helixZkManager, @Nullable ControllerMetrics controllerMetrics) {
        this._helixZkManager = helixZkManager;
        this._helixAdmin = this._helixZkManager.getClusterManagmentTool();
        this._propertyStore = this._helixZkManager.getHelixPropertyStore();
        this._helixDataAccessor = this._helixZkManager.getHelixDataAccessor();
        this._keyBuilder = this._helixDataAccessor.keyBuilder();
        this._controllerMetrics = controllerMetrics;
        this._segmentDeletionManager = new SegmentDeletionManager(this._dataDir, this._helixAdmin, this._helixClusterName, this._propertyStore, this._deletedSegmentsRetentionInDays);
        ZKMetadataProvider.setClusterTenantIsolationEnabled(this._propertyStore, (boolean)this._isSingleTenantCluster);
        try {
            helixZkManager.addInstanceConfigChangeListener(new InstanceConfigChangeListener(){

                @BatchMode(enabled=false)
                @PreFetch(enabled=false)
                public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) {
                    NotificationContext.Type type = context.getType();
                    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.FINALIZE || context.getIsChildChange()) {
                        PinotHelixResourceManager.this._instanceAdminEndpointCache.invalidateAll();
                    } else {
                        String pathChanged = context.getPathChanged();
                        String instanceName = pathChanged.substring(pathChanged.lastIndexOf(47) + 1);
                        PinotHelixResourceManager.this._instanceAdminEndpointCache.invalidate((Object)instanceName);
                    }
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException("Caught exception while adding InstanceConfigChangeListener");
        }
        HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._helixClusterName).build();
        Map configs = this._helixAdmin.getConfig(helixConfigScope, Arrays.asList("enable.case.insensitive"));
        boolean caseInsensitive = Boolean.parseBoolean(configs.getOrDefault("enable.case.insensitive", Boolean.toString(true)));
        this._tableCache = new TableCache(this._propertyStore, caseInsensitive);
    }

    public synchronized void stop() {
        this._segmentDeletionManager.stop();
    }

    public String getHelixZkURL() {
        return this._helixZkURL;
    }

    public TableCache getTableCache() {
        return this._tableCache;
    }

    public String getHelixClusterName() {
        return this._helixClusterName;
    }

    public SegmentDeletionManager getSegmentDeletionManager() {
        return this._segmentDeletionManager;
    }

    public HelixManager getHelixZkManager() {
        return this._helixZkManager;
    }

    public HelixAdmin getHelixAdmin() {
        return this._helixAdmin;
    }

    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    public LineageManager getLineageManager() {
        return this._lineageManager;
    }

    public List<String> getAllInstances() {
        return this._helixAdmin.getInstancesInCluster(this._helixClusterName);
    }

    public List<InstanceConfig> getAllHelixInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
    }

    @Nullable
    public InstanceConfig getHelixInstanceConfig(String instanceId) {
        return (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceId));
    }

    @Nullable
    public InstanceZKMetadata getInstanceZKMetadata(String instanceId) {
        return ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, (String)instanceId);
    }

    public List<String> getBrokerInstancesFor(String tableName) {
        List<InstanceConfig> instanceConfigList = this.getBrokerInstancesConfigsFor(tableName);
        return instanceConfigList.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
    }

    public List<InstanceConfig> getBrokerInstancesConfigsFor(String tableName) {
        String brokerTenantName = null;
        TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
        if (offlineTableConfig != null) {
            brokerTenantName = offlineTableConfig.getTenantConfig().getBroker();
        } else {
            TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
            if (realtimeTableConfig != null) {
                brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker();
            }
        }
        return HelixHelper.getInstancesConfigsWithTag((List)HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), (String)TagNameUtils.getBrokerTagForTenant((String)brokerTenantName));
    }

    public List<String> getAllBrokerInstances() {
        return HelixHelper.getAllInstances((HelixAdmin)this._helixAdmin, (String)this._helixClusterName).stream().filter(InstanceTypeUtils::isBroker).collect(Collectors.toList());
    }

    public List<InstanceConfig> getAllBrokerInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager).stream().filter(instance -> InstanceTypeUtils.isBroker((String)instance.getId())).collect(Collectors.toList());
    }

    public List<InstanceConfig> getAllControllerInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager).stream().filter(instance -> InstanceTypeUtils.isController((String)instance.getId())).collect(Collectors.toList());
    }

    public List<InstanceConfig> getAllMinionInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager).stream().filter(instance -> InstanceTypeUtils.isMinion((String)instance.getId())).collect(Collectors.toList());
    }

    public List<String> getInstancesWithTag(String tag) {
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)tag);
    }

    public synchronized PinotResourceManagerResponse addInstance(Instance instance, boolean updateBrokerResource) {
        List newTags;
        String instanceId = InstanceUtils.getHelixInstanceId((Instance)instance);
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig != null) {
            throw new ClientErrorException(String.format("Instance: %s already exists", instanceId), Response.Status.CONFLICT);
        }
        instanceConfig = InstanceUtils.toHelixInstanceConfig((Instance)instance);
        this._helixAdmin.addInstance(this._helixClusterName, instanceConfig);
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (InstanceTypeUtils.isBroker((String)instanceId) && updateBrokerResource && CollectionUtils.isNotEmpty((Collection)(newTags = instance.getTags()))) {
            newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.isEmpty();
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, null);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded});
            return PinotResourceManagerResponse.success(String.format("Added instance: %s, and updated broker resource - tables added: %s", instanceId, tablesAdded));
        }
        return PinotResourceManagerResponse.success("Added instance: " + instanceId);
    }

    public synchronized PinotResourceManagerResponse updateInstance(String instanceId, Instance newInstance, boolean updateBrokerResource) {
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        List newTags = newInstance.getTags();
        List oldTags = instanceConfig.getTags();
        InstanceUtils.updateHelixInstanceConfig((InstanceConfig)instanceConfig, (Instance)newInstance);
        if (!this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(instanceId), (HelixProperty)instanceConfig)) {
            throw new RuntimeException("Failed to set instance config for instance: " + instanceId);
        }
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (InstanceTypeUtils.isBroker((String)instanceId) && updateBrokerResource) {
            newBrokerTags = newTags != null ? newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()) : Collections.emptyList();
            List oldBrokerTags = oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags);
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            ArrayList tablesRemoved = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, tablesRemoved);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
            return PinotResourceManagerResponse.success(String.format("Updated instance: %s, and updated broker resource - tables added: %s, tables removed: %s", instanceId, tablesAdded, tablesRemoved));
        }
        return PinotResourceManagerResponse.success("Updated instance: " + instanceId);
    }

    public synchronized PinotResourceManagerResponse updateInstanceTags(String instanceId, String tagsString, boolean updateBrokerResource) {
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        List<String> newTags = Arrays.asList(StringUtils.split((String)tagsString, (char)','));
        List oldTags = instanceConfig.getTags();
        instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags);
        if (!this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(instanceId), (HelixProperty)instanceConfig)) {
            throw new RuntimeException("Failed to set instance config for instance: " + instanceId);
        }
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (InstanceTypeUtils.isBroker((String)instanceId) && updateBrokerResource) {
            List oldBrokerTags;
            newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags = oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()));
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            ArrayList tablesRemoved = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, tablesRemoved);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
            return PinotResourceManagerResponse.success(String.format("Updated tags: %s for instance: %s, and updated broker resource - tables added: %s, tables removed: %s", newTags, instanceId, tablesAdded, tablesRemoved));
        }
        return PinotResourceManagerResponse.success(String.format("Updated tags: %s for instance: %s", newTags, instanceId));
    }

    public PinotResourceManagerResponse updateBrokerResource(String instanceId) {
        if (!InstanceTypeUtils.isBroker((String)instanceId)) {
            throw new BadRequestException("Cannot update broker resource for non-broker instance: " + instanceId);
        }
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        long startTimeMs = System.currentTimeMillis();
        List brokerTags = instanceConfig.getTags().stream().filter(TagNameUtils::isBrokerTag).collect(Collectors.toList());
        ArrayList tablesAdded = new ArrayList();
        ArrayList tablesRemoved = new ArrayList();
        HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, brokerTags, tablesAdded, tablesRemoved);
        LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, brokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
        return PinotResourceManagerResponse.success(String.format("Updated broker resource for broker: %s - tables added: %s, tables removed: %s", instanceId, tablesAdded, tablesRemoved));
    }

    public boolean isInstanceOfflineFor(String instanceId, long offlineTimeRangeMs) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(instanceId)) != null) {
            return false;
        }
        ParticipantHistory participantHistory = (ParticipantHistory)this._helixDataAccessor.getProperty(this._keyBuilder.participantHistory(instanceId));
        if (participantHistory == null) {
            return false;
        }
        long lastOfflineTime = participantHistory.getLastOfflineTime();
        if (lastOfflineTime < 0L) {
            return false;
        }
        if (System.currentTimeMillis() - lastOfflineTime > offlineTimeRangeMs) {
            LOGGER.info("Instance: {} has been offline for more than {}ms", (Object)instanceId, (Object)offlineTimeRangeMs);
            return true;
        }
        return false;
    }

    public List<String> getAllResources() {
        return this._helixAdmin.getResourcesInCluster(this._helixClusterName);
    }

    public List<String> getAllTables() {
        ArrayList<String> tableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            tableNames.add(resourceName);
        }
        return tableNames;
    }

    public List<String> getAllOfflineTables() {
        ArrayList<String> offlineTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isOfflineTableResource((String)resourceName)) continue;
            offlineTableNames.add(resourceName);
        }
        return offlineTableNames;
    }

    public List<String> getAllDimensionTables() {
        return this._tableCache.getAllDimensionTables();
    }

    public List<String> getAllRealtimeTables() {
        ArrayList<String> realtimeTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isRealtimeTableResource((String)resourceName)) continue;
            realtimeTableNames.add(resourceName);
        }
        return realtimeTableNames;
    }

    public List<String> getAllRawTables() {
        HashSet<String> rawTableNames = new HashSet<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            rawTableNames.add(TableNameBuilder.extractRawTableName((String)resourceName));
        }
        return new ArrayList<String>(rawTableNames);
    }

    public String getActualTableName(String tableName) {
        if (this._tableCache.isIgnoreCase()) {
            String actualTableName = this._tableCache.getActualTableName(tableName);
            return actualTableName != null ? actualTableName : tableName;
        }
        return tableName;
    }

    public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) {
        return this.getSegmentsFor(tableNameWithType, shouldExcludeReplacedSegments, Long.MIN_VALUE, Long.MAX_VALUE, false);
    }

    public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments, long startTimestamp, long endTimestamp, boolean excludeOverlapping) {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Failed to find ideal state for table: %s", (Object)tableNameWithType);
        ArrayList segments = new ArrayList(idealState.getPartitionSet());
        List<SegmentZKMetadata> segmentZKMetadataList = this.getSegmentsZKMetadata(tableNameWithType);
        ArrayList<String> selectedSegments = new ArrayList<String>();
        ArrayList<String> filteredSegments = new ArrayList<String>();
        for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
            String segmentName = segmentZKMetadata.getSegmentName();
            if (!segments.contains(segmentName)) {
                filteredSegments.add(segmentName);
                continue;
            }
            if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
                selectedSegments.add(segmentName);
                continue;
            }
            if (!this.isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) continue;
            selectedSegments.add(segmentName);
        }
        LOGGER.info("Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: {}. Only showing up to 100 filtered segments.", new Object[]{tableNameWithType, filteredSegments.size(), filteredSegments.size() > 0 ? filteredSegments.subList(0, Math.min(filteredSegments.size(), 100)) : filteredSegments});
        return shouldExcludeReplacedSegments ? this.excludeReplacedSegments(tableNameWithType, selectedSegments) : selectedSegments;
    }

    public List<String> getSegmentsFromPropertyStore(String tableNameWithType) {
        return ZKMetadataProvider.getSegments(this._propertyStore, (String)tableNameWithType);
    }

    private List<String> excludeReplacedSegments(String tableNameWithType, List<String> segments) {
        SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, (String)tableNameWithType);
        if (segmentLineage == null) {
            return segments;
        }
        HashSet<String> selectedSegmentSet = new HashSet<String>(segments);
        SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, (SegmentLineage)segmentLineage);
        return new ArrayList<String>(selectedSegmentSet);
    }

    private boolean isSegmentWithinTimeStamps(SegmentZKMetadata segmentMetadata, long startTimestamp, long endTimestamp, boolean excludeOverlapping) {
        if (segmentMetadata == null) {
            return false;
        }
        long startTimeMsInSegment = segmentMetadata.getStartTimeMs();
        long endTimeMsInSegment = segmentMetadata.getEndTimeMs();
        if (startTimeMsInSegment == -1L && endTimeMsInSegment == -1L) {
            return true;
        }
        if (startTimeMsInSegment > endTimeMsInSegment) {
            LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", new Object[]{segmentMetadata.getSegmentName(), startTimeMsInSegment, endTimeMsInSegment});
            return false;
        }
        if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < endTimestamp) {
            return true;
        }
        if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= endTimestamp) {
            return false;
        }
        return !excludeOverlapping;
    }

    @Nullable
    public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
    }

    public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
        return ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, (String)tableNameWithType);
    }

    public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
        HashMap<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<Integer, String>();
        for (SegmentZKMetadata zkMetadata : this.getSegmentsZKMetadata(tableNameWithType)) {
            if (zkMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) continue;
            LLCSegmentName llcName = LLCSegmentName.of((String)zkMetadata.getSegmentName());
            int partitionGroupId = llcName.getPartitionGroupId();
            int sequenceNumber = llcName.getSequenceNumber();
            String lastCompletedSegName = (String)partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
            if (lastCompletedSegName != null && LLCSegmentName.of((String)lastCompletedSegName).getSequenceNumber() >= sequenceNumber) continue;
            partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, zkMetadata.getSegmentName());
        }
        return partitionIdToLastLLCCompletedSegmentMap.values();
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
        return this.deleteSegments(tableNameWithType, segmentNames, null);
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames, @Nullable String retentionPeriod) {
        try {
            LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, (Object)tableNameWithType);
            Preconditions.checkArgument((boolean)TableNameBuilder.isTableResource((String)tableNameWithType), (String)"Table name: %s is not a valid table name with type suffix", (Object)tableNameWithType);
            HelixHelper.removeSegmentsFromIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, segmentNames);
            if (retentionPeriod != null) {
                this._segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, TimeUtils.convertPeriodToMillis((String)retentionPeriod));
            } else {
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                this._segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
            }
            return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while deleting segment: {} from table: {}", new Object[]{segmentNames, tableNameWithType, e});
            return PinotResourceManagerResponse.failure(e.getMessage());
        }
    }

    public synchronized PinotResourceManagerResponse deleteSegment(String tableNameWithType, String segmentName) {
        return this.deleteSegments(tableNameWithType, Collections.singletonList(segmentName));
    }

    public PinotResourceManagerResponse updateBrokerTenant(Tenant tenant) {
        String brokerTenantTag = TagNameUtils.getBrokerTagForTenant((String)tenant.getTenantName());
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTenantTag);
        if (instancesInClusterWithTag.size() > tenant.getNumberOfInstances()) {
            return this.scaleDownBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        if (instancesInClusterWithTag.size() < tenant.getNumberOfInstances()) {
            return this.scaleUpBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleUpBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstancesToAdd = tenant.getNumberOfInstances() - instancesInClusterWithTag.size();
        if (unTaggedInstanceList.size() < numberOfInstancesToAdd) {
            String message = "Failed to allocate broker instances to Tag : " + tenant.getTenantName() + ", Current number of untagged broker instances : " + unTaggedInstanceList.size() + ", Current number of tagged broker instances : " + instancesInClusterWithTag.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        for (int i = 0; i < numberOfInstancesToAdd; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "broker_untagged", brokerTenantTag);
            this.addInstanceToBrokerIdealState(brokerTenantTag, instanceName);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception {
        TableConfig tableConfig;
        try {
            tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while getting table config for table {}", (Object)tableNameWithType, (Object)e);
            throw new InvalidTableConfigException("Failed to fetch broker tag for table " + tableNameWithType + " due to exception: " + e.getMessage());
        }
        if (tableConfig == null) {
            LOGGER.warn("Table " + tableNameWithType + " does not exist");
            throw new InvalidConfigException("Invalid table configuration for table " + tableNameWithType + ". Table does not exist");
        }
        return this.rebuildBrokerResource(tableNameWithType, this.getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
    }

    public PinotResourceManagerResponse rebuildBrokerResource(String tableNameWithType, Set<String> brokerInstances) {
        IdealState brokerIdealState = HelixHelper.getBrokerIdealStates((HelixAdmin)this._helixAdmin, (String)this._helixClusterName);
        Set brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType);
        if (brokerInstancesInIdealState.equals(brokerInstances)) {
            return PinotResourceManagerResponse.success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
        }
        try {
            HelixHelper.updateIdealState((HelixManager)this.getHelixZkManager(), (String)"brokerResource", idealState -> {
                assert (idealState != null);
                Map instanceStateMap = idealState.getInstanceStateMap(tableNameWithType);
                if (instanceStateMap != null) {
                    instanceStateMap.clear();
                }
                for (String brokerInstance : brokerInstances) {
                    idealState.setPartitionState(tableNameWithType, brokerInstance, "ONLINE");
                }
                return idealState;
            }, (RetryPolicy)DEFAULT_RETRY_POLICY);
            LOGGER.info("Successfully rebuilt brokerResource for table: {}", (Object)tableNameWithType);
            return PinotResourceManagerResponse.success("Rebuilt brokerResource for table: " + tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while rebuilding broker resource for table: {}", (Object)tableNameWithType, (Object)e);
            throw e;
        }
    }

    private void addInstanceToBrokerIdealState(String brokerTenantTag, String instanceName) {
        IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, "brokerResource");
        for (String tableNameWithType : tableIdealState.getPartitionSet()) {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
            Preconditions.checkNotNull((Object)tableConfig);
            String brokerTag = TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig());
            if (!brokerTag.equals(brokerTenantTag)) continue;
            tableIdealState.setPartitionState(tableNameWithType, instanceName, "ONLINE");
        }
        this._helixAdmin.setResourceIdealState(this._helixClusterName, "brokerResource", tableIdealState);
    }

    private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        int numberBrokersToUntag = instancesInClusterWithTag.size() - tenant.getNumberOfInstances();
        for (int i = 0; i < numberBrokersToUntag; ++i) {
            this.retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void retagInstance(String instanceName, String oldTag, String newTag) {
        PropertyKey instanceConfigKey = this._keyBuilder.instanceConfig(instanceName);
        InstanceConfig instanceConfig = (InstanceConfig)this._helixDataAccessor.getProperty(instanceConfigKey);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceName);
        }
        instanceConfig.removeTag(oldTag);
        instanceConfig.addTag(newTag);
        if (!this._helixDataAccessor.setProperty(instanceConfigKey, (HelixProperty)instanceConfig)) {
            throw new RuntimeException("Failed to set instance config for instance: " + instanceName);
        }
    }

    public PinotResourceManagerResponse updateServerTenant(Tenant serverTenant) {
        boolean isCurrentTenantColocated;
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        List taggedRealtimeServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeServerTag);
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        List taggedOfflineServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineServerTag);
        HashSet<String> allServingServers = new HashSet<String>();
        allServingServers.addAll(taggedOfflineServers);
        allServingServers.addAll(taggedRealtimeServers);
        boolean bl = isCurrentTenantColocated = allServingServers.size() < taggedOfflineServers.size() + taggedRealtimeServers.size();
        if (isCurrentTenantColocated != serverTenant.isCoLocated()) {
            String message = "Not support different colocated type request for update request: " + serverTenant;
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.getNumberOfInstances() < allServingServers.size() || serverTenant.getOfflineInstances() < taggedOfflineServers.size() || serverTenant.getRealtimeInstances() < taggedRealtimeServers.size()) {
            return this.scaleDownServer(serverTenant, taggedRealtimeServers, taggedOfflineServers, allServingServers);
        }
        return this.scaleUpServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, allServingServers);
    }

    private PinotResourceManagerResponse scaleUpServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, Set<String> allServingServers) {
        int incInstances = serverTenant.getNumberOfInstances() - allServingServers.size();
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedServerInstanceList();
        if (unTaggedInstanceList.size() < incInstances) {
            String message = "Failed to allocate hardware resources with tenant info: " + serverTenant + ", Current number of untagged instances : " + unTaggedInstanceList.size() + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.isCoLocated()) {
            return this.updateColocatedServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
        }
        return this.updateIndependentServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
    }

    private PinotResourceManagerResponse updateIndependentServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        for (i = 0; i < incOffline; ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "server_untagged", realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse updateColocatedServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        taggedRealtimeServers.removeAll(taggedOfflineServers);
        taggedOfflineServers.removeAll(taggedRealtimeServers);
        for (i = 0; i < incOffline; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedRealtimeServers.get(i - incInstances), offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", realtimeServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedOfflineServers.get(i - Math.max(incInstances, incOffline)), realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleDownServer(Tenant serverTenant, List<String> taggedRealtimeServers, List<String> taggedOfflineServers, Set<String> allServingServers) {
        String message = "Not support to size down the current server cluster with tenant info: " + serverTenant + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
        LOGGER.error(message);
        return PinotResourceManagerResponse.failure(message);
    }

    public boolean isBrokerTenantDeletable(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag));
        String brokerName = "brokerResource";
        IdealState brokerIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, brokerName);
        for (String partition : brokerIdealState.getPartitionSet()) {
            for (String instance : brokerIdealState.getInstanceSet(partition)) {
                if (!taggedInstances.contains(instance)) continue;
                return false;
            }
        }
        return true;
    }

    public boolean isServerTenantDeletable(String tenantName) {
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getOfflineTagForTenant((String)tenantName)));
        taggedInstances.addAll(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getRealtimeTagForTenant((String)tenantName)));
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resourceName);
            for (String partition : tableIdealState.getPartitionSet()) {
                for (String instance : tableIdealState.getInstanceSet(partition)) {
                    if (!taggedInstances.contains(instance)) continue;
                    return false;
                }
            }
        }
        return true;
    }

    public Set<String> getAllBrokerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isBrokerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    public Set<String> getAllServerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isServerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    public List<String> getTagsForInstance(String instanceName) {
        InstanceConfig config = (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceName));
        return config.getTags();
    }

    public PinotResourceManagerResponse createServerTenant(Tenant serverTenant) {
        int i;
        int numInstances = serverTenant.getNumberOfInstances();
        int numOfflineInstances = serverTenant.getOfflineInstances();
        int numRealtimeInstances = serverTenant.getRealtimeInstances();
        if (numInstances < numOfflineInstances || numInstances < numRealtimeInstances) {
            throw new BadRequestException(String.format("Cannot request more offline instances: %d or realtime instances: %d than total instances: %d", numOfflineInstances, numRealtimeInstances, numInstances));
        }
        List<String> untaggedInstances = this.getOnlineUnTaggedServerInstanceList();
        if (untaggedInstances.size() < numInstances) {
            String message = "Failed to allocate server instances to Tag : " + serverTenant.getTenantName() + ", Current number of untagged server instances : " + untaggedInstances.size() + ", Request asked number is : " + serverTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        int index = 0;
        if (numOfflineInstances > 0) {
            String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
            for (i = 0; i < numOfflineInstances; ++i) {
                this.retagInstance(untaggedInstances.get(index), "server_untagged", offlineServerTag);
                index = (index + 1) % numInstances;
            }
        }
        if (numRealtimeInstances > 0) {
            String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
            for (i = 0; i < numRealtimeInstances; ++i) {
                this.retagInstance(untaggedInstances.get(index), "server_untagged", realtimeServerTag);
                index = (index + 1) % numInstances;
            }
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse createBrokerTenant(Tenant brokerTenant) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstances = brokerTenant.getNumberOfInstances();
        if (unTaggedInstanceList.size() < numberOfInstances) {
            String message = "Failed to allocate broker instances to Tag : " + brokerTenant.getTenantName() + ", Current number of untagged server instances : " + unTaggedInstanceList.size() + ", Request asked number is : " + brokerTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)brokerTenant.getTenantName());
        for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "broker_untagged", brokerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteOfflineServerTenantFor(String tenantName) {
        String offlineTenantTag = TagNameUtils.getOfflineTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, offlineTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteRealtimeServerTenantFor(String tenantName) {
        String realtimeTenantTag = TagNameUtils.getRealtimeTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, realtimeTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteBrokerTenantFor(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag);
        for (String instance : instancesInClusterWithTag) {
            this.retagInstance(instance, brokerTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public Set<String> getAllInstancesForServerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getServerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForServerTenant(String tenantName) {
        return this.getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public Set<String> getAllInstancesForServerTenantWithType(List<InstanceConfig> instanceConfigs, String tenantName, TableType tableType) {
        return HelixHelper.getServerInstancesForTenantWithType(instanceConfigs, (String)tenantName, (TableType)tableType);
    }

    public Set<String> getAllInstancesForBrokerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getBrokerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForBrokerTenant(String tenantName) {
        return this.getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public Set<InstanceConfig> getAllInstancesConfigsForBrokerTenant(String tenantName) {
        return HelixHelper.getBrokerInstanceConfigsForTenant((List)HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), (String)tenantName);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void addSchema(Schema schema, boolean override, boolean force) throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Adding schema: {} with override: {}", (Object)schemaName, (Object)override);
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema != null) {
            if (!override) throw new SchemaAlreadyExistsException(String.format("Schema: %s already exists", schemaName));
            this.updateSchema(schema, oldSchema, force);
            return;
        } else {
            ZKMetadataProvider.setSchema(this._propertyStore, (Schema)schema);
            LOGGER.info("Added schema: {}", (Object)schemaName);
        }
    }

    public void updateSegmentsZKTimeInterval(String tableNameWithType, DateTimeFieldSpec timeColumnFieldSpec) {
        LOGGER.info("Updating segment time interval in ZK metadata for table: {}", (Object)tableNameWithType);
        List<SegmentZKMetadata> segmentZKMetadataList = this.getSegmentsZKMetadata(tableNameWithType);
        for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
            int version = segmentZKMetadata.toZNRecord().getVersion();
            this.updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
            this.updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
        }
    }

    public void updateSchema(Schema schema, boolean reload, boolean forceTableSchemaUpdate) throws SchemaNotFoundException, SchemaBackwardIncompatibleException, TableNotFoundException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Updating schema: {} with reload: {}", (Object)schemaName, (Object)reload);
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema == null) {
            throw new SchemaNotFoundException(String.format("Schema: %s does not exist", schemaName));
        }
        this.updateSchema(schema, oldSchema, forceTableSchemaUpdate);
        if (reload) {
            LOGGER.info("Reloading tables with name: {}", (Object)schemaName);
            List<String> tableNamesWithType = this.getExistingTableNamesWithType(schemaName, null);
            for (String tableNameWithType : tableNamesWithType) {
                this.reloadAllSegments(tableNameWithType, false);
            }
        }
    }

    private void updateSchema(Schema schema, Schema oldSchema, boolean forceTableSchemaUpdate) throws SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        schema.updateBooleanFieldsIfNeeded(oldSchema);
        if (schema.equals((Object)oldSchema)) {
            LOGGER.info("New schema: {} is the same as the existing schema, not updating it", (Object)schemaName);
            return;
        }
        boolean isBackwardCompatible = schema.isBackwardCompatibleWith(oldSchema);
        if (!isBackwardCompatible) {
            if (forceTableSchemaUpdate) {
                LOGGER.warn("Force updated schema: {} which is backward incompatible with the existing schema", (Object)oldSchema);
            } else {
                throw new SchemaBackwardIncompatibleException(String.format("New schema: %s is not backward-compatible with the existing schema", schemaName));
            }
        }
        ZKMetadataProvider.setSchema(this._propertyStore, (Schema)schema);
        LOGGER.info("Updated schema: {}", (Object)schemaName);
    }

    @Deprecated
    public boolean deleteSchema(Schema schema) {
        if (schema != null) {
            this.deleteSchema(schema.getSchemaName());
        }
        return false;
    }

    public boolean deleteSchema(String schemaName) {
        LOGGER.info("Deleting schema: {}", (Object)schemaName);
        String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForSchema((String)schemaName);
        if (this._propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
            this._propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
            LOGGER.info("Deleted schema: {}", (Object)schemaName);
            return true;
        }
        return false;
    }

    @Nullable
    public Schema getSchema(String schemaName) {
        return ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
    }

    @Nullable
    public Schema getTableSchema(String tableName) {
        return ZKMetadataProvider.getTableSchema(this._propertyStore, (String)tableName);
    }

    @Nullable
    public Schema getSchemaForTableConfig(TableConfig tableConfig) {
        String schemaName;
        Schema schema = this.getSchema(TableNameBuilder.extractRawTableName((String)tableConfig.getTableName()));
        if (schema == null && (schemaName = tableConfig.getValidationConfig().getSchemaName()) != null) {
            schema = this.getSchema(schemaName);
        }
        return schema;
    }

    public List<String> getSchemaNames() {
        return this._propertyStore.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore).getRelativePath(), AccessOption.PERSISTENT);
    }

    public void initUserACLConfig(ControllerConf controllerConf) throws IOException {
        if (CollectionUtils.isEmpty((Collection)ZKMetadataProvider.getAllUserName(this._propertyStore))) {
            String initUsername = controllerConf.getInitAccessControlUsername();
            String initPassword = controllerConf.getInitAccessControlPassword();
            this.addUser(new UserConfig(initUsername, initPassword, ComponentType.CONTROLLER.name(), RoleType.ADMIN.name(), null, null));
            this.addUser(new UserConfig(initUsername, initPassword, ComponentType.BROKER.name(), RoleType.ADMIN.name(), null, null));
            this.addUser(new UserConfig(initUsername, initPassword, ComponentType.SERVER.name(), RoleType.ADMIN.name(), null, null));
        }
    }

    public void addUser(UserConfig userConfig) throws IOException {
        String usernamePrefix = userConfig.getUserName() + "_" + userConfig.getComponentType();
        boolean isExists = Optional.ofNullable(ZKMetadataProvider.getAllUserConfig(this._propertyStore)).orElseGet(() -> new ArrayList()).contains(userConfig);
        if (isExists) {
            throw new UserAlreadyExistsException("User " + usernamePrefix + " already exists");
        }
        userConfig.setPassword(BcryptUtils.encrypt((String)userConfig.getPassword()));
        ZKMetadataProvider.setUserConfig(this._propertyStore, (String)usernamePrefix, (ZNRecord)AccessControlUserConfigUtils.toZNRecord((UserConfig)userConfig));
        LOGGER.info("Successfully add user:{}", (Object)usernamePrefix);
    }

    public void addTable(TableConfig tableConfig) throws IOException {
        String tableNameWithType = tableConfig.getTableName();
        LOGGER.info("Adding table {}: Start", (Object)tableNameWithType);
        if (this.getTableConfig(tableNameWithType) != null) {
            throw new TableAlreadyExistsException("Table config for " + tableNameWithType + " already exists. If this is unexpected, try deleting the table to remove all metadata associated with it.");
        }
        if (this._helixAdmin.getResourceExternalView(this._helixClusterName, tableNameWithType) != null) {
            throw new TableAlreadyExistsException("External view for " + tableNameWithType + " still exists. If the table is just deleted, please wait for the clean up to finish before recreating it. If the external view is not removed after a long time, try restarting the servers showing up in the external view");
        }
        LOGGER.info("Adding table {}: Validate table configs", (Object)tableNameWithType);
        this.validateTableTenantConfig(tableConfig);
        IdealState idealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), this._enableBatchMessageMode);
        TableType tableType = tableConfig.getTableType();
        if (ZKMetadataProvider.getSchema(this._propertyStore, (String)TableNameBuilder.extractRawTableName((String)tableNameWithType)) == null) {
            throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType);
        }
        Preconditions.checkState((tableType == TableType.OFFLINE || tableType == TableType.REALTIME ? 1 : 0) != 0, (String)"Invalid table type: %s", (Object)tableType);
        LOGGER.info("Adding table {}: Creating table config in the property store", (Object)tableNameWithType);
        if (!ZKMetadataProvider.createTableConfig(this._propertyStore, (TableConfig)tableConfig)) {
            throw new RuntimeException("Failed to create table config for table: " + tableNameWithType);
        }
        try {
            this.assignInstances(tableConfig, true);
            LOGGER.info("Adding table {}: Assigned instances", (Object)tableNameWithType);
            if (tableType == TableType.OFFLINE) {
                this._helixAdmin.addResource(this._helixClusterName, tableNameWithType, idealState);
                LOGGER.info("Adding table {}: Added ideal state for offline table", (Object)tableNameWithType);
            } else {
                this._pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
                LOGGER.info("Adding table {}: Added ideal state with first consuming segment", (Object)tableNameWithType);
            }
        }
        catch (Exception e) {
            LOGGER.error("Caught exception during offline table setup. Cleaning up table {}", (Object)tableNameWithType, (Object)e);
            this.deleteTable(tableNameWithType, tableType, null);
            throw e;
        }
        LOGGER.info("Adding table {}: Updating BrokerResource for table", (Object)tableNameWithType);
        List brokers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig()));
        HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)"brokerResource", is -> {
            assert (is != null);
            is.getRecord().getMapFields().put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, "ONLINE"));
            return is;
        });
        LOGGER.info("Adding table {}: Successfully added table", (Object)tableNameWithType);
    }

    @VisibleForTesting
    void validateTableTenantConfig(TableConfig tableConfig) {
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        String tableNameWithType = tableConfig.getTableName();
        String brokerTag = tenantConfig.getBroker();
        String serverTag = tenantConfig.getServer();
        if (brokerTag == null || serverTag == null) {
            if (!this._isSingleTenantCluster) {
                throw new InvalidTableConfigException("server and broker tenants must be specified for multi-tenant cluster for table: " + tableNameWithType);
            }
            String newBrokerTag = brokerTag == null ? "DefaultTenant" : brokerTag;
            String newServerTag = serverTag == null ? "DefaultTenant" : serverTag;
            tableConfig.setTenantConfig(new TenantConfig(newBrokerTag, newServerTag, tenantConfig.getTagOverrideConfig()));
        }
        TreeSet<String> tagsToCheck = new TreeSet<String>();
        tagsToCheck.add(TagNameUtils.extractBrokerTag((TenantConfig)tenantConfig));
        if (tableConfig.isDimTable()) {
            String offlineTag = TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig);
            String realtimeTag = TagNameUtils.extractRealtimeServerTag((TenantConfig)tenantConfig);
            if (this.getInstancesWithTag(offlineTag).isEmpty() && this.getInstancesWithTag(realtimeTag).isEmpty()) {
                throw new InvalidTableConfigException("Failed to find instances for dimension table: " + tableNameWithType);
            }
        } else if (tableConfig.getTableType() == TableType.OFFLINE) {
            tagsToCheck.add(TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig));
        } else {
            String consumingServerTag = TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)consumingServerTag)) {
                throw new InvalidTableConfigException("Invalid CONSUMING server tag: " + (String)consumingServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(consumingServerTag);
            String completedServerTag = TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)completedServerTag)) {
                throw new InvalidTableConfigException("Invalid COMPLETED server tag: " + completedServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(completedServerTag);
        }
        for (String tag : tagsToCheck) {
            if (!this.getInstancesWithTag(tag).isEmpty()) continue;
            throw new InvalidTableConfigException("Failed to find instances with tag: " + tag + " for table: " + tableNameWithType);
        }
        List tierConfigList = tableConfig.getTierConfigsList();
        if (CollectionUtils.isNotEmpty((Collection)tierConfigList)) {
            for (TierConfig tierConfig : tierConfigList) {
                if (!this.getInstancesWithTag(tierConfig.getServerTag()).isEmpty()) continue;
                throw new InvalidTableConfigException(String.format("Failed to find instances with tag: %s as used by tier: %s for table: %s", tierConfig.getServerTag(), tierConfig.getName(), tableNameWithType));
            }
        }
    }

    public boolean setZKData(String path, ZNRecord record, int expectedVersion, int accessOption) {
        return this._helixDataAccessor.getBaseDataAccessor().set(path, (Object)record, expectedVersion, accessOption);
    }

    public boolean deleteZKPath(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().remove(path, -1);
    }

    public ZNRecord readZKData(String path) {
        return (ZNRecord)this._helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
    }

    public List<ZNRecord> getZKChildren(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().getChildren(path, null, -1, 2, 50);
    }

    public List<String> getZKChildNames(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
    }

    public Map<String, Stat> getZKChildrenStats(String path) {
        List childNames = this._helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
        List childPaths = childNames.stream().map(name -> (path + "/" + name).replaceAll("//", "/")).collect(Collectors.toList());
        Stat[] stats = this._helixDataAccessor.getBaseDataAccessor().getStats(childPaths, -1);
        LinkedHashMap<String, Stat> statsMap = new LinkedHashMap<String, Stat>(childNames.size());
        for (int i = 0; i < childNames.size(); ++i) {
            statsMap.put((String)childNames.get(i), stats[i]);
        }
        return statsMap;
    }

    public Stat getZKStat(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().getStat(path, -1);
    }

    public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
        this._pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
    }

    private void assignInstances(TableConfig tableConfig, boolean override) {
        String tableNameWithType = tableConfig.getTableName();
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableNameWithType);
        ArrayList<InstancePartitionsType> instancePartitionsTypesToAssign = new ArrayList<InstancePartitionsType>();
        for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
            if (!InstanceAssignmentConfigUtils.allowInstanceAssignment((TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType) || !override && InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, (String)instancePartitionsType.getInstancePartitionsName(rawTableName)) != null) continue;
            instancePartitionsTypesToAssign.add(instancePartitionsType);
        }
        InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        if (!instancePartitionsTypesToAssign.isEmpty()) {
            LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, (Object)tableNameWithType);
            for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
                InstancePartitions instancePartitions;
                boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions((TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType);
                boolean isPreConfigurationBasedAssignment = InstanceAssignmentConfigUtils.isMirrorServerSetAssignment((TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType);
                if (!hasPreConfiguredInstancePartitions) {
                    instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
                    LOGGER.info("Persisting instance partitions: {}", (Object)instancePartitions);
                } else {
                    String referenceInstancePartitionsName = (String)tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
                    if (isPreConfigurationBasedAssignment) {
                        InstancePartitions preConfiguredInstancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(this._propertyStore, (String)referenceInstancePartitionsName, (String)instancePartitionsType.getInstancePartitionsName(rawTableName));
                        instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null, preConfiguredInstancePartitions);
                        LOGGER.info("Persisting instance partitions: {} (based on {})", (Object)instancePartitions, (Object)preConfiguredInstancePartitions);
                    } else {
                        instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(this._propertyStore, (String)referenceInstancePartitionsName, (String)instancePartitionsType.getInstancePartitionsName(rawTableName));
                        LOGGER.info("Persisting instance partitions: {} (referencing {})", (Object)instancePartitions, (Object)referenceInstancePartitionsName);
                    }
                }
                InstancePartitionsUtils.persistInstancePartitions(this._propertyStore, (InstancePartitions)instancePartitions);
            }
        }
        if (CollectionUtils.isNotEmpty((Collection)tableConfig.getTierConfigsList()) && tableConfig.getInstanceAssignmentConfigMap() != null) {
            for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
                if (!tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName()) || !override && InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, (String)InstancePartitionsUtils.getInstancePartitionsNameForTier((String)tableNameWithType, (String)tierConfig.getName())) != null) continue;
                LOGGER.info("Calculating instance partitions for tier: {}, table : {}", (Object)tierConfig.getName(), (Object)tableNameWithType);
                InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null, (InstanceAssignmentConfig)tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
                LOGGER.info("Persisting instance partitions: {}", (Object)instancePartitions);
                InstancePartitionsUtils.persistInstancePartitions(this._propertyStore, (InstancePartitions)instancePartitions);
            }
        }
    }

    public void updateUserConfig(UserConfig userConfig) throws IOException {
        String usernameWithComponent = userConfig.getUsernameWithComponent();
        ZKMetadataProvider.setUserConfig(this._propertyStore, (String)usernameWithComponent, (ZNRecord)AccessControlUserConfigUtils.toZNRecord((UserConfig)userConfig));
    }

    public void updateTableConfig(TableConfig tableConfig) throws IOException {
        this.validateTableTenantConfig(tableConfig);
        this.setExistingTableConfig(tableConfig);
    }

    public void setExistingTableConfig(TableConfig tableConfig) throws IOException {
        this.setExistingTableConfig(tableConfig, -1);
    }

    public void setExistingTableConfig(TableConfig tableConfig, int expectedVersion) throws IOException {
        String tableNameWithType = tableConfig.getTableName();
        if (!ZKMetadataProvider.setTableConfig(this._propertyStore, (TableConfig)tableConfig, (int)expectedVersion)) {
            throw new RuntimeException("Failed to update table config in Zookeeper for table: " + tableNameWithType + " with expected version: " + expectedVersion);
        }
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        String replicationConfigured = Integer.toString(tableConfig.getReplication());
        if (!idealState.getReplicas().equals(replicationConfigured)) {
            HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, is -> {
                assert (is != null);
                is.setReplicas(replicationConfigured);
                return is;
            }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)1.2f));
        }
        this.assignInstances(tableConfig, false);
        this.sendTableConfigRefreshMessage(tableNameWithType);
    }

    public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        ImmutablePair tableConfigWithVersion = ZKMetadataProvider.getTableConfigWithVersion(this._propertyStore, (String)tableNameWithType);
        if (tableConfigWithVersion == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        TableConfig tableConfig = (TableConfig)tableConfigWithVersion.getLeft();
        tableConfig.setCustomConfig(newConfigs);
        this.setExistingTableConfig(tableConfig, (Integer)tableConfigWithVersion.getRight());
    }

    public void updateSegmentsValidationAndRetentionConfigFor(String tableName, TableType type, SegmentsValidationAndRetentionConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        ImmutablePair tableConfigWithVersion = ZKMetadataProvider.getTableConfigWithVersion(this._propertyStore, (String)tableNameWithType);
        if (tableConfigWithVersion == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        TableConfig tableConfig = (TableConfig)tableConfigWithVersion.getLeft();
        tableConfig.setValidationConfig(newConfigs);
        this.setExistingTableConfig(tableConfig, (Integer)tableConfigWithVersion.getRight());
    }

    public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        ImmutablePair tableConfigWithVersion = ZKMetadataProvider.getTableConfigWithVersion(this._propertyStore, (String)tableNameWithType);
        if (tableConfigWithVersion == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        TableConfig tableConfig = (TableConfig)tableConfigWithVersion.getLeft();
        tableConfig.setIndexingConfig(newConfigs);
        this.setExistingTableConfig(tableConfig, (Integer)tableConfigWithVersion.getRight());
    }

    public void deleteUser(String username) {
        ZKMetadataProvider.removeUserConfigFromPropertyStore(this._propertyStore, (String)username);
        LOGGER.info("Deleting user{}: Removed from user resouces", (Object)username);
        LOGGER.info("Deleting user{} finished", (Object)username);
    }

    public void deleteOfflineTable(String tableName) {
        this.deleteOfflineTable(tableName, null);
    }

    public void deleteOfflineTable(String tableName, @Nullable String retentionPeriod) {
        this.deleteTable(tableName, TableType.OFFLINE, retentionPeriod);
    }

    public void deleteRealtimeTable(String tableName) {
        this.deleteRealtimeTable(tableName, null);
    }

    public void deleteRealtimeTable(String tableName, @Nullable String retentionPeriod) {
        this.deleteTable(tableName, TableType.REALTIME, retentionPeriod);
    }

    public void deleteTable(String tableName, TableType tableType, @Nullable String retentionPeriod) {
        String tableNameWithType = TableNameBuilder.forType((TableType)tableType).tableNameWithType(tableName);
        LOGGER.info("Deleting table {}: Start", (Object)tableNameWithType);
        HelixHelper.removeResourceFromBrokerIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed from broker resource", (Object)tableNameWithType);
        this.deleteTableOnServers(tableNameWithType);
        this._helixDataAccessor.removeProperty(this._keyBuilder.idealStates(tableNameWithType));
        LOGGER.info("Deleting table {}: Removed ideal state", (Object)tableNameWithType);
        Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis((String)retentionPeriod) : null;
        this._segmentDeletionManager.removeSegmentsFromStore(tableNameWithType, this.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriodMs);
        LOGGER.info("Deleting table {}: Removed stored segments", (Object)tableNameWithType);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment metadata", (Object)tableNameWithType);
        if (tableType == TableType.OFFLINE) {
            InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)tableNameWithType);
        } else {
            String rawTableName = TableNameBuilder.extractRawTableName((String)tableName);
            InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
            InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
        }
        LOGGER.info("Deleting table {}: Removed instance partitions", (Object)tableNameWithType);
        InstancePartitionsUtils.removeTierInstancePartitions(this._propertyStore, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed tier instance partitions", (Object)tableNameWithType);
        SegmentLineageAccessHelper.deleteSegmentLineage(this._propertyStore, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed segment lineage", (Object)tableNameWithType);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed all minion task metadata", (Object)tableNameWithType);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, (String)tableNameWithType);
        LOGGER.info("Deleting table {}: Removed table config", (Object)tableNameWithType);
        LOGGER.info("Deleting table {}: Finish", (Object)tableNameWithType);
    }

    public PinotResourceManagerResponse toggleTableState(String tableNameWithType, StateType stateType) {
        if (!this.hasTable(tableNameWithType)) {
            return PinotResourceManagerResponse.failure("Table: " + tableNameWithType + " not found");
        }
        switch (stateType) {
            case ENABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, true);
                boolean resetSuccessful = false;
                try {
                    this._helixAdmin.resetResource(this._helixClusterName, Collections.singletonList(tableNameWithType));
                    resetSuccessful = true;
                }
                catch (HelixException e) {
                    LOGGER.warn("Caught exception while resetting resource: {}", (Object)tableNameWithType, (Object)e);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " enabled (reset success = " + resetSuccessful + ")");
            }
            case DISABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, false);
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " disabled");
            }
            case DROP: {
                TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
                if (tableType == TableType.OFFLINE) {
                    this.deleteOfflineTable(tableNameWithType);
                } else {
                    this.deleteRealtimeTable(tableNameWithType);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " dropped");
            }
        }
        throw new IllegalStateException();
    }

    @Nullable
    public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJobType jobType) {
        String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob((ControllerJobType)jobType);
        ZNRecord jobsZnRecord = (ZNRecord)this._propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
        return jobsZnRecord != null ? (Map)jobsZnRecord.getMapFields().get(jobId) : null;
    }

    public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes, Predicate<Map<String, String>> jobMetadataChecker) {
        HashMap<String, Map<String, String>> controllerJobs = new HashMap<String, Map<String, String>>();
        for (ControllerJobType jobType : jobTypes) {
            String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob((ControllerJobType)jobType);
            ZNRecord jobsZnRecord = (ZNRecord)this._propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
            if (jobsZnRecord == null) continue;
            Map jobMetadataMap = jobsZnRecord.getMapFields();
            for (Map.Entry jobMetadataEntry : jobMetadataMap.entrySet()) {
                String jobId = (String)jobMetadataEntry.getKey();
                Map jobMetadata = (Map)jobMetadataEntry.getValue();
                Preconditions.checkState((boolean)((String)jobMetadata.get("jobType")).equals(jobType.name()), (String)"Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", (Object)jobType.name(), (Object)jobResourcePath, (Object)jobId);
                if (!jobMetadataChecker.test(jobMetadata)) continue;
                controllerJobs.put(jobId, jobMetadata);
            }
        }
        return controllerJobs;
    }

    public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, long jobSubmissionTimeMs, int numMessagesSent) {
        HashMap<String, String> jobMetadata = new HashMap<String, String>();
        jobMetadata.put("jobId", jobId);
        jobMetadata.put("tableName", tableNameWithType);
        jobMetadata.put("jobType", ControllerJobType.RELOAD_SEGMENT.toString());
        jobMetadata.put("submissionTimeMs", Long.toString(jobSubmissionTimeMs));
        jobMetadata.put("messageCount", Integer.toString(numMessagesSent));
        jobMetadata.put("segmentName", segmentName);
        return this.addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT);
    }

    public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, Set<String> consumingSegmentsCommitted) throws JsonProcessingException {
        HashMap<String, String> jobMetadata = new HashMap<String, String>();
        jobMetadata.put("jobId", jobId);
        jobMetadata.put("tableName", tableNameWithType);
        jobMetadata.put("jobType", ControllerJobType.FORCE_COMMIT.toString());
        jobMetadata.put("submissionTimeMs", Long.toString(jobSubmissionTimeMs));
        jobMetadata.put("segmentsForceCommitted", JsonUtils.objectToString(consumingSegmentsCommitted));
        return this.addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT);
    }

    public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, int numberOfMessagesSent) {
        HashMap<String, String> jobMetadata = new HashMap<String, String>();
        jobMetadata.put("jobId", jobId);
        jobMetadata.put("tableName", tableNameWithType);
        jobMetadata.put("jobType", ControllerJobType.RELOAD_SEGMENT.toString());
        jobMetadata.put("submissionTimeMs", Long.toString(jobSubmissionTimeMs));
        jobMetadata.put("messageCount", Integer.toString(numberOfMessagesSent));
        return this.addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT);
    }

    public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType) {
        return this.addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
    }

    public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType, Predicate<Map<String, String>> prevJobMetadataChecker) {
        Preconditions.checkState((jobMetadata.get("submissionTimeMs") != null ? 1 : 0) != 0, (Object)"Submission Time in JobMetadata record not set. Cannot expire these records");
        String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob((ControllerJobType)jobType);
        Stat stat = new Stat();
        ZNRecord jobsZnRecord = (ZNRecord)this._propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT);
        if (jobsZnRecord != null) {
            Map<String, Map> jobMetadataMap = jobsZnRecord.getMapFields();
            Map prevJobMetadata = (Map)jobMetadataMap.get(jobId);
            if (!prevJobMetadataChecker.test(prevJobMetadata)) {
                return false;
            }
            jobMetadataMap.put(jobId, jobMetadata);
            if (jobMetadataMap.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
                jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> Long.compare(Long.parseLong((String)((Map)v2.getValue()).get("submissionTimeMs")), Long.parseLong((String)((Map)v1.getValue()).get("submissionTimeMs")))).collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK).stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            }
            jobsZnRecord.setMapFields((Map)jobMetadataMap);
            return this._propertyStore.set(jobResourcePath, (Object)jobsZnRecord, stat.getVersion(), AccessOption.PERSISTENT);
        }
        jobsZnRecord = new ZNRecord(jobResourcePath);
        jobsZnRecord.setMapField(jobId, jobMetadata);
        return this._propertyStore.set(jobResourcePath, (Object)jobsZnRecord, AccessOption.PERSISTENT);
    }

    public boolean updateJobsForTable(String tableNameWithType, ControllerJobType jobType, Consumer<Map<String, String>> updater) {
        Stat stat;
        String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob((ControllerJobType)jobType);
        ZNRecord jobsZnRecord = (ZNRecord)this._propertyStore.get(jobResourcePath, stat = new Stat(), AccessOption.PERSISTENT);
        if (jobsZnRecord == null) {
            return true;
        }
        Map jobMetadataMap = jobsZnRecord.getMapFields();
        for (Map jobMetadata : jobMetadataMap.values()) {
            if (!((String)jobMetadata.get("tableName")).equals(tableNameWithType)) continue;
            updater.accept(jobMetadata);
        }
        jobsZnRecord.setMapFields(jobMetadataMap);
        return this._propertyStore.set(jobResourcePath, (Object)jobsZnRecord, stat.getVersion(), AccessOption.PERSISTENT);
    }

    @VisibleForTesting
    public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl) {
        SegmentZKMetadata segmentZkmetadata = ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, null, -1L);
        ZNRecord znRecord = segmentZkmetadata.toZNRecord();
        String segmentName = segmentMetadata.getName();
        String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName);
        Preconditions.checkState((boolean)this._propertyStore.set(segmentZKMetadataPath, (Object)znRecord, AccessOption.PERSISTENT), (Object)("Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName));
        LOGGER.info("Added segment: {} of table: {} to property store", (Object)segmentName, (Object)tableNameWithType);
        this.assignTableSegment(tableNameWithType, segmentName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void assignTableSegment(String tableNameWithType, String segmentName) {
        String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName);
        try {
            TableConfig tableConfig = this.getTableConfig(tableNameWithType);
            Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (Object)("Failed to find table config for table: " + tableNameWithType));
            Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = this.fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
            if (this._enableTieredSegmentAssignment && CollectionUtils.isNotEmpty((Collection)tableConfig.getTierConfigsList())) {
                List sortedTiers = TierConfigUtils.getSortedTiersForStorageType((List)tableConfig.getTierConfigsList(), (String)"pinot_server", (HelixManager)this._helixZkManager);
                this.updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
                InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment((String)tableNameWithType, (String)segmentName, (List)sortedTiers, (HelixManager)this._helixZkManager);
                if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource((String)tableNameWithType)) {
                    LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", new Object[]{tierInstancePartitions, segmentName, tableNameWithType});
                    instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions);
                }
            }
            SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixZkManager, tableConfig, this._controllerMetrics);
            Object object = this.getTableUpdaterLock(tableNameWithType);
            synchronized (object) {
                Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap;
                HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, idealState -> {
                    assert (idealState != null);
                    Map currentAssignment = idealState.getRecord().getMapFields();
                    if (currentAssignment.containsKey(segmentName)) {
                        LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", (Object)segmentName, (Object)tableNameWithType);
                    } else {
                        List<String> assignedInstances = segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
                        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", new Object[]{segmentName, assignedInstances, tableNameWithType});
                        currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, "ONLINE"));
                    }
                    return idealState;
                });
                LOGGER.info("Added segment: {} to IdealState for table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", new Object[]{segmentName, tableNameWithType, e});
            if (this._propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) {
                LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            } else {
                LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
            throw e;
        }
    }

    private Map<InstancePartitionsType, InstancePartitions> fetchOrComputeInstancePartitions(String tableNameWithType, TableConfig tableConfig) {
        if (TableNameBuilder.isOfflineTableResource((String)tableNameWithType)) {
            return Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixZkManager, (TableConfig)tableConfig, (InstancePartitionsType)InstancePartitionsType.OFFLINE));
        }
        if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
            return Collections.singletonMap(InstancePartitionsType.CONSUMING, InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixZkManager, (TableConfig)tableConfig, (InstancePartitionsType)InstancePartitionsType.CONSUMING));
        }
        InstancePartitionsType instancePartitionsType = InstancePartitionsType.COMPLETED;
        InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, (String)InstancePartitionsUtils.getInstancePartitionsName((String)tableNameWithType, (String)instancePartitionsType.toString()));
        if (instancePartitions != null) {
            return Collections.singletonMap(instancePartitionsType, instancePartitions);
        }
        TagOverrideConfig tagOverrideConfig = tableConfig.getTenantConfig().getTagOverrideConfig();
        if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted() == null) {
            instancePartitionsType = InstancePartitionsType.CONSUMING;
        }
        return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils.computeDefaultInstancePartitions((HelixManager)this._helixZkManager, (TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType));
    }

    public boolean isUpsertTable(String tableName) {
        TableConfig realtimeTableConfig = this.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
        if (realtimeTableConfig == null) {
            return false;
        }
        UpsertConfig upsertConfig = realtimeTableConfig.getUpsertConfig();
        return upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE;
    }

    private Object getTableUpdaterLock(String offlineTableName) {
        return this._tableUpdaterLocks[(offlineTableName.hashCode() & Integer.MAX_VALUE) % this._tableUpdaterLocks.length];
    }

    @Nullable
    public ZNRecord getSegmentMetadataZnRecord(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.getZnRecord(this._propertyStore, (String)ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName));
    }

    public boolean createSegmentZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
        return ZKMetadataProvider.createSegmentZkMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata);
    }

    public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, int expectedVersion) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata, (int)expectedVersion);
    }

    public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata);
    }

    public boolean removeSegmentZKMetadata(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.removeSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
    }

    private void deleteTableOnServers(String tableNameWithType) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.externalView(tableNameWithType)) == null) {
            LOGGER.warn("No delete table message sent for newly created table: {} without external view", (Object)tableNameWithType);
            return;
        }
        LOGGER.info("Sending delete table messages for table: {}", (Object)tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setSessionSpecific(true);
        TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)tableDeletionMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} delete table messages for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No delete table message sent for table: {}", (Object)tableNameWithType);
        }
    }

    public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata, DateTimeFieldSpec timeColumnFieldSpec) {
        ZKMetadataUtils.updateSegmentZKTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
    }

    @VisibleForTesting
    public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata, SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl) {
        String segmentName = segmentMetadata.getName();
        ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, null, -1L);
        if (!ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata, (int)expectedVersion)) {
            throw new RuntimeException("Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
        }
        LOGGER.info("Updated segment: {} of table: {} to property store", (Object)segmentName, (Object)tableNameWithType);
        this.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
    }

    public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload) {
        LOGGER.info("Sending reload message for table: {} with forceDownload: {}", (Object)tableNameWithType, (Object)forceDownload);
        if (forceDownload) {
            TableType tt = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
            Preconditions.checkArgument((tt == TableType.OFFLINE ? 1 : 0) != 0, (String)"Table: %s is not an OFFLINE table, which is required to force to download segments", (Object)tableNameWithType);
        }
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No reload message sent for table: {}", (Object)tableNameWithType);
        }
        return Pair.of((Object)numMessagesSent, (Object)segmentReloadMessage.getMsgId());
    }

    public Pair<Integer, String> reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) {
        LOGGER.info("Sending reload message for segment: {} in table: {} with forceDownload: {}", new Object[]{segmentName, tableNameWithType, forceDownload});
        if (forceDownload) {
            TableType tt = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
            Preconditions.checkArgument((tt == TableType.OFFLINE ? 1 : 0) != 0, (String)"Table: %s is not an OFFLINE table, which is required to force to download segment: %s", (Object)tableNameWithType, (Object)segmentName);
        }
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setPartition(segmentName);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, Collections.singletonList(segmentName), forceDownload);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for segment: {} in table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
        } else {
            LOGGER.warn("No reload message sent for segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
        }
        return Pair.of((Object)numMessagesSent, (Object)segmentReloadMessage.getMsgId());
    }

    public void resetSegment(String tableNameWithType, String segmentName, @Nullable String targetInstance) {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Could not find ideal state for table: %s", (Object)tableNameWithType);
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        Preconditions.checkState((externalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
        Set<String> instanceSet = PinotHelixResourceManager.parseInstanceSet(idealState, segmentName, targetInstance);
        Map externalViewStateMap = externalView.getStateMap(segmentName);
        for (String instance : instanceSet) {
            if (externalViewStateMap == null || "OFFLINE".equals(externalViewStateMap.get(instance))) {
                LOGGER.info("Skipping resetting for segment: {} of table: {} on instance: {}", new Object[]{segmentName, tableNameWithType, instance});
                continue;
            }
            LOGGER.info("Resetting segment: {} of table: {} on instance: {}", new Object[]{segmentName, tableNameWithType, instance});
            this.resetPartitionAllState(instance, tableNameWithType, Collections.singleton(segmentName));
        }
    }

    public void resetSegments(String tableNameWithType, @Nullable String targetInstance, boolean errorSegmentsOnly) {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Could not find ideal state for table: %s", (Object)tableNameWithType);
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        Preconditions.checkState((externalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
        HashMap<String, Set> instanceToResetSegmentsMap = new HashMap<String, Set>();
        HashMap<String, Set> instanceToSkippedSegmentsMap = new HashMap<String, Set>();
        for (String string : idealState.getPartitionSet()) {
            Set<String> instanceSet = PinotHelixResourceManager.parseInstanceSet(idealState, string, targetInstance);
            Map externalViewStateMap = externalView.getStateMap(string);
            for (String instance : instanceSet) {
                if (errorSegmentsOnly) {
                    if (externalViewStateMap == null || !"ERROR".equals(externalViewStateMap.get(instance))) continue;
                    instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet()).add(string);
                    continue;
                }
                if (externalViewStateMap == null || "OFFLINE".equals(externalViewStateMap.get(instance))) {
                    instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new HashSet()).add(string);
                    continue;
                }
                instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet()).add(string);
            }
        }
        if (instanceToResetSegmentsMap.isEmpty()) {
            LOGGER.info("No segments to reset for table: {}", (Object)tableNameWithType);
            return;
        }
        LOGGER.info("Resetting segments: {} of table: {}", instanceToResetSegmentsMap, (Object)tableNameWithType);
        for (Map.Entry entry : instanceToResetSegmentsMap.entrySet()) {
            this.resetPartitionAllState((String)entry.getKey(), tableNameWithType, (Set)entry.getValue());
        }
        LOGGER.info("Reset segments for table {} finished. With the following segments skipped: {}", (Object)tableNameWithType, instanceToSkippedSegmentsMap);
    }

    private static Set<String> parseInstanceSet(IdealState idealState, String segmentName, @Nullable String targetInstance) {
        Set instanceSet = idealState.getInstanceSet(segmentName);
        Preconditions.checkState((boolean)CollectionUtils.isNotEmpty((Collection)instanceSet), (String)"Could not find segment: %s in ideal state", (Object)segmentName);
        if (targetInstance != null) {
            return instanceSet.contains(targetInstance) ? Collections.singleton(targetInstance) : Collections.emptySet();
        }
        return instanceSet;
    }

    private void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) {
        LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", new Object[]{resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, instanceName, this._helixClusterName});
        HelixDataAccessor accessor = this._helixZkManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
        LiveInstance liveInstance = (LiveInstance)accessor.getProperty(keyBuilder.liveInstance(instanceName));
        if (liveInstance == null) {
            String instanceConfigPath = PropertyPathBuilder.instanceConfig((String)this._helixClusterName, (String)instanceName);
            throw new RuntimeException(String.format("Can't find instance: %s on %s", instanceName, instanceConfigPath));
        }
        IdealState idealState = (IdealState)accessor.getProperty(keyBuilder.idealStates(resourceName));
        String stateModelDef = idealState.getStateModelDefRef();
        StateModelDefinition stateModel = (StateModelDefinition)accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
        String sessionId = liveInstance.getEphemeralOwner();
        CurrentState curState = (CurrentState)accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
        List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
        for (Message message : messages) {
            if (!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId.equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) || !resetPartitionNames.contains(message.getPartitionName())) continue;
            throw new RuntimeException(String.format("Can't reset state for %s.%s on %s, because a pending message %s exists for resource %s", resourceName, resetPartitionNames, instanceName, message, message.getResourceName()));
        }
        Object adminName = null;
        try {
            adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
        }
        catch (UnknownHostException e) {
            LOGGER.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", (Throwable)e);
            adminName = "UNKNOWN";
        }
        ArrayList<Message> resetMessages = new ArrayList<Message>();
        ArrayList<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
        for (String partitionName : resetPartitionNames) {
            String msgId = UUID.randomUUID().toString();
            Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId);
            message.setSrcName((String)adminName);
            message.setTgtName(instanceName);
            message.setMsgState(Message.MessageState.NEW);
            message.setPartitionName(partitionName);
            message.setResourceName(resourceName);
            message.setTgtSessionId(sessionId);
            message.setStateModelDef(stateModelDef);
            message.setFromState(curState.getState(partitionName));
            message.setToState(stateModel.getInitialState());
            message.setStateModelFactoryName(idealState.getStateModelFactoryName());
            if (idealState.getResourceGroupName() != null) {
                message.setResourceGroupName(idealState.getResourceGroupName());
            }
            if (idealState.getInstanceGroupTag() != null) {
                message.setResourceTag(idealState.getInstanceGroupTag());
            }
            resetMessages.add(message);
            messageKeys.add(keyBuilder.message(instanceName, message.getId()));
        }
        accessor.setChildren(messageKeys, resetMessages);
    }

    public void sendSegmentRefreshMessage(String tableNameWithType, String segmentName, boolean refreshServerSegment, boolean refreshBrokerRouting) {
        int numMessagesSent;
        SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(tableNameWithType, segmentName);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setSessionSpecific(true);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        if (refreshServerSegment) {
            recipientCriteria.setResource(tableNameWithType);
            recipientCriteria.setPartition(segmentName);
            numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentRefreshMessage, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
            } else {
                LOGGER.warn("No segment refresh message sent to servers for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
        if (refreshBrokerRouting) {
            recipientCriteria.setResource("brokerResource");
            recipientCriteria.setPartition(tableNameWithType);
            numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentRefreshMessage, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
            } else {
                LOGGER.warn("No segment refresh message sent to brokers for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
    }

    private void sendTableConfigRefreshMessage(String tableNameWithType) {
        TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setPartition(tableNameWithType);
        int numMessagesSent = this._helixZkManager.getMessagingService().send(recipientCriteria, (Message)tableConfigRefreshMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No table config refresh message sent to brokers for table: {}", (Object)tableNameWithType);
        }
    }

    private void sendRoutingTableRebuildMessage(String tableNameWithType) {
        RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setPartition(tableNameWithType);
        int numMessagesSent = this._helixZkManager.getMessagingService().send(recipientCriteria, (Message)routingTableRebuildMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", (Object)tableNameWithType);
        }
    }

    public void toggleQueryQuotaStateForBroker(String brokerInstanceName, String state) {
        HashMap<String, String> propToUpdate = new HashMap<String, String>();
        propToUpdate.put("queryRateLimitDisabled", Boolean.toString("DISABLE".equals(state)));
        HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, new String[]{this._helixClusterName}).forParticipant(brokerInstanceName).build();
        this._helixAdmin.setConfig(scope, propToUpdate);
    }

    public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) {
        TreeMap<String, List<String>> serverToSegmentsMap = new TreeMap<String, List<String>>();
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        for (Map.Entry entry : idealState.getRecord().getMapFields().entrySet()) {
            String segmentName = (String)entry.getKey();
            for (Map.Entry instanceStateEntry : ((Map)entry.getValue()).entrySet()) {
                if (((String)instanceStateEntry.getValue()).equals("OFFLINE")) continue;
                serverToSegmentsMap.computeIfAbsent((String)instanceStateEntry.getKey(), key -> new ArrayList()).add(segmentName);
            }
        }
        return serverToSegmentsMap;
    }

    public Set<String> getServers(String tableNameWithType, String segmentName) {
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        Map instanceStateMap = idealState.getInstanceStateMap(segmentName);
        Preconditions.checkState((instanceStateMap != null ? 1 : 0) != 0, (String)"Segment: %s does not exist in the ideal state of table: %s", (Object)segmentName, (Object)tableNameWithType);
        TreeSet<String> servers = new TreeSet<String>();
        for (Map.Entry entry : instanceStateMap.entrySet()) {
            if (((String)entry.getValue()).equals("OFFLINE")) continue;
            servers.add((String)entry.getKey());
        }
        return servers;
    }

    public Set<String> getConsumingSegments(String tableNameWithType) {
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        HashSet<String> consumingSegments = new HashSet<String>();
        for (String segment : idealState.getPartitionSet()) {
            Map instanceStateMap = idealState.getInstanceStateMap(segment);
            if (!instanceStateMap.containsValue("CONSUMING")) continue;
            consumingSegments.add(segment);
        }
        return consumingSegments;
    }

    @Deprecated
    public Set<String> getServersForSegment(String tableNameWithType, String segmentName) {
        return this.getServers(tableNameWithType, segmentName);
    }

    public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
        IdealState is = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        ArrayList segmentList = new ArrayList(is.getPartitionSet());
        ArrayList<String> segmentMetadataPaths = new ArrayList<String>(segmentList.size());
        for (String segmentName : segmentList) {
            segmentMetadataPaths.add(this.buildPathForSegmentMetadata(tableNameWithType, segmentName));
        }
        if (!this._segmentCrcMap.containsKey(tableNameWithType)) {
            this._lastKnownSegmentMetadataVersionMap.put(tableNameWithType, new HashMap());
            this._segmentCrcMap.put(tableNameWithType, new HashMap());
        }
        Stat[] metadataStats = this._propertyStore.getStats(segmentMetadataPaths, AccessOption.PERSISTENT);
        for (int i = 0; i < metadataStats.length; ++i) {
            String currentSegment = (String)segmentList.get(i);
            Stat metadataStat = metadataStats[i];
            if (metadataStat == null) continue;
            int currentVersion = metadataStat.getVersion();
            if (this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).containsKey(currentSegment)) {
                int lastKnownVersion = this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).get(currentSegment);
                if (lastKnownVersion == currentVersion) continue;
                this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
                continue;
            }
            this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
        }
        Set segmentsSet = is.getPartitionSet();
        Iterator<Map.Entry<String, Long>> iter = this._segmentCrcMap.get(tableNameWithType).entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            String segmentName = entry.getKey();
            if (segmentsSet.contains(segmentName)) continue;
            iter.remove();
            this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).remove(segmentName);
        }
        TreeMap<String, String> resultCrcMap = new TreeMap<String, String>();
        for (String segment : segmentList) {
            resultCrcMap.put(segment, String.valueOf(this._segmentCrcMap.get(tableNameWithType).get(segment)));
        }
        return resultCrcMap;
    }

    private void updateSegmentMetadataCrc(String tableNameWithType, String segmentName, int currentVersion) {
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
        assert (segmentZKMetadata != null);
        this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).put(segmentName, currentVersion);
        this._segmentCrcMap.get(tableNameWithType).put(segmentName, segmentZKMetadata.getCrc());
    }

    public String buildPathForSegmentMetadata(String tableNameWithType, String segmentName) {
        return "/SEGMENTS/" + tableNameWithType + "/" + segmentName;
    }

    public boolean hasUser(String username, String component) {
        return ZKMetadataProvider.getAllUserConfig(this._propertyStore).stream().anyMatch(user -> user.isExist(username, ComponentType.valueOf((String)component)));
    }

    public boolean hasTable(String tableNameWithType) {
        return this._helixDataAccessor.getBaseDataAccessor().exists(this._keyBuilder.idealStates(tableNameWithType).getPath(), AccessOption.PERSISTENT);
    }

    public boolean hasOfflineTable(String tableName) {
        return this.hasTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
    }

    public boolean hasRealtimeTable(String tableName) {
        return this.hasTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
    }

    public boolean isTableEnabled(String tableNameWithType) throws TableNotFoundException {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        if (idealState == null) {
            throw new TableNotFoundException("Failed to find ideal state for table: " + tableNameWithType);
        }
        return idealState.isEnabled();
    }

    @Nullable
    public IdealState getTableIdealState(String tableNameWithType) {
        return this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public ExternalView getTableExternalView(String tableNameWithType) {
        return this._helixAdmin.getResourceExternalView(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public TableConfig getTableConfig(String tableNameWithType) {
        return ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
    }

    public List<TableConfig> getAllTableConfigs() {
        return ZKMetadataProvider.getAllTableConfigs(this._propertyStore);
    }

    @Nullable
    public TableConfig getOfflineTableConfig(String tableName) {
        return ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getRealtimeTableConfig(String tableName) {
        return ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getTableConfig(String tableName, TableType tableType) {
        if (tableType == TableType.OFFLINE) {
            return this.getOfflineTableConfig(tableName);
        }
        return this.getRealtimeTableConfig(tableName);
    }

    public List<TableConfig> getTableConfigsForSchema(String schemaName) {
        TableConfig realtimeTableConfig;
        ArrayList<TableConfig> tableConfigs = new ArrayList<TableConfig>();
        TableConfig offlineTableConfig = this.getOfflineTableConfig(schemaName);
        if (offlineTableConfig != null) {
            tableConfigs.add(offlineTableConfig);
        }
        if ((realtimeTableConfig = this.getRealtimeTableConfig(schemaName)) != null) {
            tableConfigs.add(realtimeTableConfig);
        }
        return tableConfigs;
    }

    public List<String> getServerInstancesForTable(String tableName, TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        HashSet serverInstances = new HashSet();
        List instanceConfigs = HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
        if (tableType == TableType.OFFLINE) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig)));
        } else if (TableType.REALTIME.equals((Object)tableType)) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig)));
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig)));
        }
        return new ArrayList<String>(serverInstances);
    }

    public List<String> getBrokerInstancesForTable(String tableName, TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig()));
    }

    public PinotResourceManagerResponse enableInstance(String instanceName) {
        return this.enableInstance(instanceName, true, 10000L);
    }

    public PinotResourceManagerResponse disableInstance(String instanceName) {
        return this.enableInstance(instanceName, false, 10000L);
    }

    public PinotResourceManagerResponse dropInstance(String instanceName) {
        OperationValidationResponse check = this.instanceDropSafetyCheck(instanceName);
        if (!check.isSafe()) {
            return PinotResourceManagerResponse.failure(check.getIssueMessage(0));
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instance(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /INSTANCES/" + instanceName);
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instanceConfig(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /CONFIGS/PARTICIPANT/" + instanceName + ". Make sure to remove /CONFIGS/PARTICIPANT/" + instanceName + " manually since /INSTANCES/" + instanceName + " has already been removed");
        }
        return PinotResourceManagerResponse.success("Instance " + instanceName + " dropped");
    }

    public OperationValidationResponse instanceDropSafetyCheck(String instanceName) {
        OperationValidationResponse response = new OperationValidationResponse().setInstanceName(instanceName);
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(instanceName)) != null) {
            response.putIssue(OperationValidationResponse.ErrorCode.IS_ALIVE, instanceName);
        }
        this.getAllResources().forEach(resource -> {
            IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resource);
            for (String partition : idealState.getPartitionSet()) {
                if (!idealState.getInstanceSet(partition).contains(instanceName)) continue;
                response.putIssue(OperationValidationResponse.ErrorCode.CONTAINS_RESOURCE, instanceName, (String)resource);
                break;
            }
        });
        return response.setSafe(response.getIssues().isEmpty());
    }

    private PinotResourceManagerResponse enableInstance(String instanceName, boolean enableInstance, long timeOutMs) {
        if (!this.instanceExists(instanceName)) {
            return PinotResourceManagerResponse.failure("Instance " + instanceName + " not found");
        }
        this._helixAdmin.enableInstance(this._helixClusterName, instanceName, enableInstance);
        long intervalWaitTimeMs = 500L;
        long deadline = System.currentTimeMillis() + timeOutMs;
        String offlineState = "OFFLINE";
        while (System.currentTimeMillis() < deadline) {
            PropertyKey liveInstanceKey = this._keyBuilder.liveInstance(instanceName);
            LiveInstance liveInstance = (LiveInstance)this._helixDataAccessor.getProperty(liveInstanceKey);
            if (liveInstance == null) {
                if (!enableInstance) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
            } else {
                boolean toggleSucceeded = true;
                PropertyKey instanceCurrentStatesKey = this._keyBuilder.currentStates(instanceName, liveInstance.getSessionId());
                List instanceCurrentStates = this._helixDataAccessor.getChildValues(instanceCurrentStatesKey, true);
                if (instanceCurrentStates.isEmpty()) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
                for (CurrentState currentState : instanceCurrentStates) {
                    for (String state : currentState.getPartitionStateMap().values()) {
                        if (enableInstance) {
                            if (!offlineState.equals(state)) continue;
                            toggleSucceeded = false;
                            break;
                        }
                        if (offlineState.equals(state)) continue;
                        toggleSucceeded = false;
                        break;
                    }
                    if (toggleSucceeded) continue;
                    break;
                }
                if (toggleSucceeded) {
                    return enableInstance ? PinotResourceManagerResponse.success("Instance " + instanceName + " enabled") : PinotResourceManagerResponse.success("Instance " + instanceName + " disabled");
                }
            }
            try {
                Thread.sleep(intervalWaitTimeMs);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got interrupted when sleeping for {}ms to wait until the current state matched for instance: {}", (Object)intervalWaitTimeMs, (Object)instanceName);
                return PinotResourceManagerResponse.failure("Got interrupted when waiting for instance: " + instanceName + " to be " + (enableInstance ? "enabled" : "disabled"));
            }
        }
        return PinotResourceManagerResponse.failure("Instance: " + instanceName + (enableInstance ? " enable" : " disable") + " failed, timeout");
    }

    public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean trackRebalanceProgress) throws TableNotFoundException {
        TableConfig tableConfig = this.getTableConfig(tableNameWithType);
        if (tableConfig == null) {
            throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType);
        }
        Preconditions.checkState((rebalanceJobId != null ? 1 : 0) != 0, (Object)"RebalanceId not populated in the rebalanceConfig");
        ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
        if (trackRebalanceProgress) {
            zkBasedTableRebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, TableRebalanceContext.forInitialAttempt(rebalanceJobId, rebalanceConfig), this);
        }
        return this.rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, rebalanceConfig, zkBasedTableRebalanceObserver);
    }

    public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tableConfig, String rebalanceJobId, RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
        if (rebalanceConfig.isUpdateTargetTier()) {
            this.updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
        }
        TableRebalancer tableRebalancer = new TableRebalancer(this._helixZkManager, zkBasedTableRebalanceObserver, this._controllerMetrics);
        return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId);
    }

    @VisibleForTesting
    void updateTargetTier(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) {
        List tierCfgs = tableConfig.getTierConfigsList();
        List sortedTiers = tierCfgs == null ? Collections.emptyList() : TierConfigUtils.getSortedTiers((List)tierCfgs, (HelixManager)this._helixZkManager);
        LOGGER.info("For rebalanceId: {}, updating target tiers for segments of table: {} with tierConfigs: {}", new Object[]{rebalanceJobId, tableNameWithType, sortedTiers});
        for (String segmentName : this.getSegmentsFor(tableNameWithType, true)) {
            this.updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
        }
    }

    private void updateSegmentTargetTier(String tableNameWithType, String segmentName, List<Tier> sortedTiers) {
        ZNRecord segmentMetadataZNRecord = this.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
        if (segmentMetadataZNRecord == null) {
            LOGGER.debug("No ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            return;
        }
        Tier targetTier = null;
        for (Tier tier : sortedTiers) {
            TierSegmentSelector tierSegmentSelector = tier.getSegmentSelector();
            if (!tierSegmentSelector.selectSegment(tableNameWithType, segmentName)) continue;
            targetTier = tier;
            break;
        }
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadataZNRecord);
        String targetTierName = null;
        if (targetTier == null) {
            if (segmentZKMetadata.getTier() == null) {
                LOGGER.debug("Segment: {} of table: {} is already set to go to default tier", (Object)segmentName, (Object)tableNameWithType);
                return;
            }
            LOGGER.info("Segment: {} of table: {} is put back on default tier", (Object)segmentName, (Object)tableNameWithType);
        } else {
            targetTierName = targetTier.getName();
            if (targetTierName.equals(segmentZKMetadata.getTier())) {
                LOGGER.debug("Segment: {} of table: {} is already set to go to target tier: {}", new Object[]{segmentName, tableNameWithType, targetTierName});
                return;
            }
            LOGGER.info("Segment: {} of table: {} is put onto new tier: {}", new Object[]{segmentName, tableNameWithType, targetTierName});
        }
        segmentZKMetadata.setTier(targetTierName);
        this.updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZNRecord.getVersion());
    }

    public boolean instanceExists(String instanceName) {
        return this.getHelixInstanceConfig(instanceName) != null;
    }

    public List<String> getOnlineUnTaggedBrokerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"broker_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineUnTaggedServerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"server_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineInstanceList() {
        return this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
    }

    public BiMap<String, String> getDataInstanceAdminEndpoints(Set<String> instances) throws InvalidConfigException {
        HashBiMap endpointToInstance = HashBiMap.create((int)instances.size());
        for (String instance : instances) {
            String instanceAdminEndpoint;
            try {
                instanceAdminEndpoint = (String)this._instanceAdminEndpointCache.get((Object)instance);
            }
            catch (Exception e) {
                String errorMessage = String.format("Caught exception while getting instance admin endpoint for instance: %s. Error message: %s", instance, e.getMessage());
                LOGGER.error(errorMessage, (Throwable)e);
                throw new InvalidConfigException(errorMessage);
            }
            endpointToInstance.put((Object)instance, (Object)instanceAdminEndpoint);
        }
        return endpointToInstance;
    }

    public List<String> getExistingTableNamesWithType(String tableName, @Nullable TableType tableType) throws TableNotFoundException {
        ArrayList<String> tableNamesWithType = new ArrayList<String>(2);
        TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName((String)tableName);
        if (tableTypeFromTableName != null) {
            if (tableType != null && tableType != tableTypeFromTableName) {
                throw new IllegalArgumentException("Table name: " + tableName + " does not match table type: " + tableType);
            }
            if (this.getTableConfig(tableName) != null) {
                tableNamesWithType.add(tableName);
            }
        } else {
            String realtimeTableName;
            String offlineTableName;
            if ((tableType == null || tableType == TableType.OFFLINE) && this.getTableConfig(offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(offlineTableName);
            }
            if ((tableType == null || tableType == TableType.REALTIME) && this.getTableConfig(realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(realtimeTableName);
            }
        }
        if (tableNamesWithType.isEmpty()) {
            throw new TableNotFoundException("Table '" + tableName + (String)(tableType != null ? "_" + tableType : "") + "' not found.");
        }
        return tableNamesWithType;
    }

    public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo, boolean forceCleanup, Map<String, String> customMap) {
        String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
        HashSet<String> segmentsForTable = new HashSet<String>(this.getSegmentsFor(tableNameWithType, true));
        for (String segment : segmentsFrom) {
            Preconditions.checkState((boolean)segmentsForTable.contains(segment), (String)"Segment: %s from 'segmentsFrom' does not exist in table: %s", (Object)segment, (Object)tableNameWithType);
        }
        for (String segment : segmentsTo) {
            Preconditions.checkState((!segmentsForTable.contains(segment) ? 1 : 0) != 0, (String)"Segment: %s from 'segmentsTo' exists in table: %s", (Object)segment, (Object)tableNameWithType);
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                int expectedVersion;
                SegmentLineage segmentLineage;
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (String)"Failed to find table config for table: %s", (Object)tableNameWithType);
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, (String)tableNameWithType);
                if (segmentLineageZNRecord == null) {
                    segmentLineage = new SegmentLineage(tableNameWithType);
                    expectedVersion = -1;
                } else {
                    segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                    expectedVersion = segmentLineageZNRecord.getVersion();
                }
                Preconditions.checkState((segmentLineage.getLineageEntry(segmentLineageEntryId) == null ? 1 : 0) != 0, (String)"Entry id: %s already exists in the segment lineage for table: %s", (Object)segmentLineageEntryId, (Object)tableNameWithType);
                ArrayList<String> segmentsToCleanUp = new ArrayList<String>();
                Iterator entryIterator = segmentLineage.getLineageEntries().entrySet().iterator();
                while (entryIterator.hasNext()) {
                    HashSet segmentsToInLineageEntry;
                    HashSet segmentsFromInLineageEntry;
                    Map.Entry entry = entryIterator.next();
                    String entryId = (String)entry.getKey();
                    LineageEntry lineageEntry = (LineageEntry)entry.getValue();
                    if (lineageEntry.getState() == LineageEntryState.REVERTED) {
                        if (!forceCleanup) continue;
                        segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
                        continue;
                    }
                    if (forceCleanup) {
                        if (!(lineageEntry.getState() != LineageEntryState.IN_PROGRESS || Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) && Collections.disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) {
                            LOGGER.info("Detected the incomplete lineage entry with overlapped 'segmentsFrom' or 'segmentsTo'. Deleting or reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, segmentsTo={}", new Object[]{tableNameWithType, entryId, lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()});
                            ArrayList segmentsToForEntryToRevert = new ArrayList(lineageEntry.getSegmentsTo());
                            segmentsToForEntryToRevert.removeAll(segmentsTo);
                            if (segmentsToForEntryToRevert.isEmpty()) {
                                entryIterator.remove();
                            } else {
                                entry.setValue(new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert, LineageEntryState.REVERTED, System.currentTimeMillis()));
                            }
                            segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
                            continue;
                        }
                        if (lineageEntry.getState() != LineageEntryState.COMPLETED || !"REFRESH".equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType((TableConfig)tableConfig)) || !CollectionUtils.isEqualCollection((Collection)segmentsFrom, (Collection)lineageEntry.getSegmentsTo())) continue;
                        LOGGER.info("Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. tableNameWithType={}, segmentsToCleanUp={}", (Object)tableNameWithType, (Object)lineageEntry.getSegmentsFrom());
                        segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
                        continue;
                    }
                    if (!segmentsFrom.isEmpty() && !(segmentsFromInLineageEntry = new HashSet(lineageEntry.getSegmentsFrom())).isEmpty()) {
                        for (String segment : segmentsFrom) {
                            Preconditions.checkState((!segmentsFromInLineageEntry.contains(segment) ? 1 : 0) != 0, (String)"Segment: %s from 'segmentsFrom' exists in table: %s, entry id: %s as 'segmentsFrom' (replacing a replaced segment)", (Object)segment, (Object)tableNameWithType, (Object)entryId);
                        }
                    }
                    if (segmentsTo.isEmpty() || (segmentsToInLineageEntry = new HashSet(lineageEntry.getSegmentsTo())).isEmpty()) continue;
                    for (String segment : segmentsTo) {
                        Preconditions.checkState((!segmentsToInLineageEntry.contains(segment) ? 1 : 0) != 0, (String)"Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentTo' (name conflict)", (Object)segment, (Object)tableNameWithType, (Object)entryId);
                    }
                }
                segmentLineage.addLineageEntry(segmentLineageEntryId, new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
                this._lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, segmentLineage);
                if (SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, (SegmentLineage)segmentLineage, (int)expectedVersion)) {
                    if (!segmentsToCleanUp.isEmpty()) {
                        LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp);
                        this.deleteSegments(tableNameWithType, segmentsToCleanUp);
                    }
                    return true;
                }
                LOGGER.warn("Failed to write segment lineage for table: {}", (Object)tableNameWithType);
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("startReplaceSegments is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, segmentsTo = {}, segmentLineageEntryId = {})", new Object[]{tableNameWithType, segmentsFrom, segmentsTo, segmentLineageEntryId});
        return segmentLineageEntryId;
    }

    public void endReplaceSegments(String tableNameWithType, String segmentLineageEntryId, @Nullable EndReplaceSegmentsRequest endReplaceSegmentsRequest) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                Map customMap;
                SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, (String)tableNameWithType);
                Preconditions.checkState((segmentLineage != null ? 1 : 0) != 0, (String)"Failed to find segment lineage for table: %s", (Object)tableNameWithType);
                LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
                Preconditions.checkState((lineageEntry != null ? 1 : 0) != 0, (String)"Failed to find entry id: %s from segment lineage for table: %s", (Object)segmentLineageEntryId, (Object)tableNameWithType);
                if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
                    LOGGER.info("Found lineage entry is already in COMPLETED status. (tableNameWithType = {}, segmentLineageEntryId = {})", (Object)tableNameWithType, (Object)segmentLineageEntryId);
                    return true;
                }
                if (lineageEntry.getState() == LineageEntryState.REVERTED) {
                    String errorMsg = String.format("The target lineage entry state is not 'IN_PROGRESS'. Cannot update to 'COMPLETED' state. (tableNameWithType=%s, segmentLineageEntryId=%s, state=%s)", tableNameWithType, segmentLineageEntryId, lineageEntry.getState());
                    LOGGER.error(errorMsg);
                    throw new RuntimeException(errorMsg);
                }
                HashSet<String> segmentsForTable = new HashSet<String>(this.getSegmentsFor(tableNameWithType, false));
                List segmentsTo = lineageEntry.getSegmentsTo();
                if (endReplaceSegmentsRequest != null && !endReplaceSegmentsRequest.getSegmentsTo().isEmpty()) {
                    HashSet segmentsToInSet = new HashSet(segmentsTo);
                    for (String segment : endReplaceSegmentsRequest.getSegmentsTo()) {
                        Preconditions.checkState((boolean)segmentsToInSet.contains(segment), (String)"Segment: %s from EndReplaceSegmentsRequest does not exist in original segmentsTo list used while starting segment replacement: %s", (Object)segment, (Object)tableNameWithType);
                    }
                    segmentsTo = endReplaceSegmentsRequest.getSegmentsTo();
                }
                for (String segment : segmentsTo) {
                    Preconditions.checkState((boolean)segmentsForTable.contains(segment), (String)"Segment: %s from 'segmentsTo' does not exist in table: %s", (Object)segment, (Object)tableNameWithType);
                }
                if (!this.waitForSegmentsBecomeOnline(tableNameWithType, segmentsTo)) {
                    return false;
                }
                LineageEntry lineageEntryToUpdate = new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsTo, LineageEntryState.COMPLETED, System.currentTimeMillis());
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                Map map = customMap = endReplaceSegmentsRequest == null ? null : endReplaceSegmentsRequest.getCustomMap();
                if (this.writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry, this._propertyStore, LineageUpdateType.END, customMap)) {
                    this.sendRoutingTableRebuildMessage(tableNameWithType);
                    return true;
                }
                LOGGER.warn("Failed to write segment lineage for table: {}", (Object)tableNameWithType);
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during endReplaceSegments. (tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("endReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", (Object)tableNameWithType, (Object)segmentLineageEntryId);
    }

    public SegmentLineage listSegmentLineage(String tableNameWithType) {
        return SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, (String)tableNameWithType);
    }

    public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert, @Nullable RevertReplaceSegmentsRequest revertReplaceSegmentsRequest) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                Map customMap;
                List segmentsTo;
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, (String)tableNameWithType);
                Preconditions.checkState((segmentLineageZNRecord != null ? 1 : 0) != 0, (String)"Failed to find segment lineage for table: %s", (Object)tableNameWithType);
                SegmentLineage segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                int expectedVersion = segmentLineageZNRecord.getVersion();
                LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
                Preconditions.checkState((lineageEntry != null ? 1 : 0) != 0, (String)"Failed to find entry id: %s from segment lineage for table: %s", (Object)segmentLineageEntryId, (Object)tableNameWithType);
                Preconditions.checkState((lineageEntry.getState() != LineageEntryState.REVERTED && (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || forceRevert) ? 1 : 0) != 0, (String)"Lineage state is not valid. Cannot update the lineage entry to be 'REVERTED'. (tableNameWithType=%s, segmentLineageEntryId=%s, segmentLineageEntryState=%s, forceRevert=%s)", (Object)tableNameWithType, (Object)segmentLineageEntryId, (Object)lineageEntry.getState(), (Object)forceRevert);
                if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
                    Set<String> onlineSegments = this.getOnlineSegmentsFromIdealState(tableNameWithType, true);
                    for (String string : lineageEntry.getSegmentsFrom()) {
                        Preconditions.checkState((boolean)onlineSegments.contains(string), (String)"Segment: %s from 'segmentsFrom' does not exist in table: %s (reverting a deleted segment)", (Object)string, (Object)tableNameWithType);
                    }
                }
                if (!(segmentsTo = lineageEntry.getSegmentsTo()).isEmpty()) {
                    for (Map.Entry entry : segmentLineage.getLineageEntries().entrySet()) {
                        HashSet segmentsFromInLineageEntry;
                        String currentEntryId = (String)entry.getKey();
                        LineageEntry currentLineageEntry = (LineageEntry)entry.getValue();
                        if (currentLineageEntry.getState() != LineageEntryState.IN_PROGRESS && currentLineageEntry.getState() != LineageEntryState.COMPLETED || (segmentsFromInLineageEntry = new HashSet(currentLineageEntry.getSegmentsFrom())).isEmpty()) continue;
                        for (String segment : segmentsTo) {
                            Preconditions.checkState((!segmentsFromInLineageEntry.contains(segment) ? 1 : 0) != 0, (String)"Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentsTo' (reverting a merged segment)", (Object)segment, (Object)tableNameWithType, (Object)currentEntryId);
                        }
                    }
                }
                LineageEntry lineageEntryToUpdate = new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED, System.currentTimeMillis());
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                Map map = customMap = revertReplaceSegmentsRequest == null ? null : revertReplaceSegmentsRequest.getCustomMap();
                if (this.writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry, this._propertyStore, LineageUpdateType.REVERT, customMap)) {
                    this.sendRoutingTableRebuildMessage(tableNameWithType);
                    if (!segmentsTo.isEmpty()) {
                        this.deleteSegments(tableNameWithType, segmentsTo);
                    }
                    return true;
                }
                LOGGER.warn("Failed to write segment lineage for table: {}", (Object)tableNameWithType);
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during revertReplaceSegments. (tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", (Object)tableNameWithType, (Object)segmentLineageEntryId);
    }

    private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId, LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore<ZNRecord> propertyStore, LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
        for (int i = 0; i < 10; ++i) {
            ZNRecord segmentLineageToUpdateZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, (String)tableConfig.getTableName());
            int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
            SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageToUpdateZNRecord);
            LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId);
            if (!currentLineageEntry.equals((Object)lineageEntryToMatch)) {
                String errorMsg = String.format("Aborting the to update lineage entry since we find that the entry has been modified for table %s, entry id: %s", tableConfig.getTableName(), lineageEntryId);
                LOGGER.error(errorMsg);
                throw new RuntimeException(errorMsg);
            }
            segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate);
            switch (lineageUpdateType) {
                case START: {
                    this._lineageManager.updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
                    break;
                }
                case END: {
                    this._lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
                    break;
                }
                case REVERT: {
                    this._lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
                    break;
                }
            }
            if (!SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, (SegmentLineage)segmentLineageToUpdate, (int)expectedVersion)) continue;
            return true;
        }
        return false;
    }

    private boolean waitForSegmentsBecomeOnline(String tableNameWithType, List<String> segmentsToCheck) throws InterruptedException {
        String segmentNotOnline;
        long endTimeMs = System.currentTimeMillis() + 600000L;
        do {
            segmentNotOnline = null;
            Set<String> onlineSegments = this.getOnlineSegmentsFromExternalView(tableNameWithType);
            for (String segment : segmentsToCheck) {
                if (onlineSegments.contains(segment)) continue;
                segmentNotOnline = segment;
                break;
            }
            if (segmentNotOnline == null) {
                return true;
            }
            Thread.sleep(1000L);
        } while (System.currentTimeMillis() < endTimeMs);
        LOGGER.warn("Timed out while waiting for segment: {} to become ONLINE for table: {}", (Object)segmentNotOnline, (Object)tableNameWithType);
        return false;
    }

    public Set<String> getOnlineSegmentsFromIdealState(String tableNameWithType, boolean includeConsuming) {
        IdealState tableIdealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((tableIdealState != null ? 1 : 0) != 0, (Object)"Table ideal state is null");
        Map segmentAssignment = tableIdealState.getRecord().getMapFields();
        HashSet<String> matchingSegments = new HashSet<String>(HashUtil.getHashMapCapacity((int)segmentAssignment.size()));
        for (Map.Entry entry : segmentAssignment.entrySet()) {
            Map instanceStateMap = (Map)entry.getValue();
            if (!instanceStateMap.containsValue("ONLINE") && (!includeConsuming || !instanceStateMap.containsValue("CONSUMING"))) continue;
            matchingSegments.add((String)entry.getKey());
        }
        return matchingSegments;
    }

    public Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        if (externalView == null) {
            LOGGER.warn(String.format("External view is null for table (%s)", tableNameWithType));
            return Collections.emptySet();
        }
        Map segmentAssignment = externalView.getRecord().getMapFields();
        HashSet<String> onlineSegments = new HashSet<String>(HashUtil.getHashMapCapacity((int)segmentAssignment.size()));
        for (Map.Entry entry : segmentAssignment.entrySet()) {
            Map instanceStateMap = (Map)entry.getValue();
            if (!instanceStateMap.containsValue("ONLINE") && !instanceStateMap.containsValue("CONSUMING")) continue;
            onlineSegments.add((String)entry.getKey());
        }
        return onlineSegments;
    }

    public TableStats getTableStats(String tableNameWithType) {
        String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig((String)tableNameWithType);
        Stat stat = this._propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
        Preconditions.checkState((stat != null ? 1 : 0) != 0, (String)"Failed to read ZK stats for table: %s", (Object)tableNameWithType);
        String creationTime = SIMPLE_DATE_FORMAT.format(Instant.ofEpochMilli(stat.getCtime()));
        return new TableStats(creationTime);
    }

    public Map<String, List<InstanceInfo>> getTableToLiveBrokersMapping() {
        ExternalView ev = (ExternalView)this._helixDataAccessor.getProperty(this._keyBuilder.externalView("brokerResource"));
        if (ev == null) {
            throw new IllegalStateException("Failed to find external view for brokerResource");
        }
        Map instanceConfigMap = HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager).stream().collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity()));
        HashMap<String, List<InstanceInfo>> result = new HashMap<String, List<InstanceInfo>>();
        ZNRecord znRecord = ev.getRecord();
        for (Map.Entry tableToBrokersEntry : znRecord.getMapFields().entrySet()) {
            String tableName = (String)tableToBrokersEntry.getKey();
            Map brokersToState = (Map)tableToBrokersEntry.getValue();
            ArrayList<InstanceInfo> hosts = new ArrayList<InstanceInfo>();
            for (Map.Entry brokerEntry : brokersToState.entrySet()) {
                if (!"ONLINE".equalsIgnoreCase((String)brokerEntry.getValue()) || !instanceConfigMap.containsKey(brokerEntry.getKey())) continue;
                InstanceConfig instanceConfig = (InstanceConfig)instanceConfigMap.get(brokerEntry.getKey());
                hosts.add(new InstanceInfo(instanceConfig.getInstanceName(), instanceConfig.getHostName(), Integer.parseInt(instanceConfig.getPort())));
            }
            if (hosts.isEmpty()) continue;
            result.put(tableName, hosts);
        }
        return result;
    }

    public List<String> getLiveBrokersForTable(String tableName) throws TableNotFoundException {
        ExternalView ev = (ExternalView)this._helixDataAccessor.getProperty(this._keyBuilder.externalView("brokerResource"));
        if (ev == null) {
            throw new IllegalStateException("Failed to find external view for brokerResource");
        }
        TableType inputTableType = TableNameBuilder.getTableTypeFromTableName((String)tableName);
        if (inputTableType != null) {
            if (!this.hasTable(tableName)) {
                throw new TableNotFoundException(String.format("Table=%s not found", tableName));
            }
            return this.getLiveBrokersForTable(ev, tableName);
        }
        String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        boolean hasOfflineTable = this.hasTable(offlineTableName);
        boolean hasRealtimeTable = this.hasTable(realtimeTableName);
        if (!hasOfflineTable && !hasRealtimeTable) {
            throw new TableNotFoundException(String.format("Table=%s not found", tableName));
        }
        if (hasOfflineTable && hasRealtimeTable) {
            HashSet<String> offlineTables = new HashSet<String>(this.getLiveBrokersForTable(ev, offlineTableName));
            return this.getLiveBrokersForTable(ev, realtimeTableName).stream().filter(offlineTables::contains).collect(Collectors.toList());
        }
        return this.getLiveBrokersForTable(ev, hasOfflineTable ? offlineTableName : realtimeTableName);
    }

    private List<String> getLiveBrokersForTable(ExternalView ev, String tableNameWithType) {
        Map brokerToStateMap = ev.getStateMap(tableNameWithType);
        ArrayList<String> hosts = new ArrayList<String>();
        if (brokerToStateMap != null) {
            for (Map.Entry entry : brokerToStateMap.entrySet()) {
                if (!"ONLINE".equalsIgnoreCase((String)entry.getValue())) continue;
                hosts.add((String)entry.getKey());
            }
        }
        return hosts;
    }

    public int getNumReplicas(TableConfig tableConfig) {
        if (tableConfig.isDimTable()) {
            TenantConfig tenantConfig = tableConfig.getTenantConfig();
            Set<String> serverInstances = this.getAllInstancesForServerTenant(tenantConfig.getServer());
            return serverInstances.size();
        }
        return tableConfig.getReplication();
    }

    public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName, Map<String, String> taskProperties) {
        String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8);
        LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}, with properties {}.\"", new Object[]{periodicTaskRequestId, periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables", taskProperties});
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setResource("leadControllerResource");
        recipientCriteria.setSelfExcluded(false);
        RunPeriodicTaskMessage runPeriodicTaskMessage = new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName, taskProperties);
        ClusterMessagingService clusterMessagingService = this.getHelixZkManager().getMessagingService();
        int messageCount = clusterMessagingService.send(recipientCriteria, (Message)runPeriodicTaskMessage, null, -1);
        LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", (Object)periodicTaskRequestId, (Object)messageCount);
        return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
    }

    public Map<String, Integer> minimumInstancesRequiredForTags() {
        HashMap<String, Integer> tagMinInstanceMap = new HashMap<String, Integer>();
        for (InstanceConfig instanceConfig : this.getAllHelixInstanceConfigs()) {
            for (String tag : instanceConfig.getTags()) {
                tagMinInstanceMap.put(tag, 0);
            }
        }
        for (TableConfig tableConfig : this.getAllTableConfigs()) {
            String tag = TagNameUtils.getServerTagForTenant((String)tableConfig.getTenantConfig().getServer(), (TableType)tableConfig.getTableType());
            tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication()));
            String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tableConfig.getTenantConfig().getBroker());
            tagMinInstanceMap.put(brokerTag, 1);
        }
        return tagMinInstanceMap;
    }

    private static enum LineageUpdateType {
        START,
        END,
        REVERT;

    }
}

