/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.PauseStatus;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotLLCRealtimeSegmentManager {
    public static final String IS_TABLE_PAUSED = "isTablePaused";
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
    private static final int STARTING_SEQUENCE_NUMBER = 0;
    private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
    private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30000L;
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300000L;
    private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS = 3600000L;
    private static final Random RANDOM = new Random();
    private final HelixAdmin _helixAdmin;
    private final HelixManager _helixManager;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final PinotHelixResourceManager _helixResourceManager;
    private final String _clusterName;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;
    private final MetadataEventNotifierFactory _metadataEventNotifierFactory;
    private final int _numIdealStateUpdateLocks;
    private final Lock[] _idealStateUpdateLocks;
    private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
    private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
    private final boolean _isTmpSegmentAsyncDeletionEnabled;
    private final int _deepstoreUploadRetryTimeoutMs;
    private final FileUploadDownloadClient _fileUploadDownloadClient;
    private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
    private final ExecutorService _deepStoreUploadExecutor;
    private final Set<String> _deepStoreUploadExecutorPendingSegments;
    private volatile boolean _isStopping = false;

    public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._helixAdmin = helixResourceManager.getHelixAdmin();
        this._helixManager = helixResourceManager.getHelixZkManager();
        this._propertyStore = helixResourceManager.getPropertyStore();
        this._helixResourceManager = helixResourceManager;
        this._clusterName = helixResourceManager.getHelixClusterName();
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
        this._metadataEventNotifierFactory = MetadataEventNotifierFactory.loadFactory(controllerConf.subset(METADATA_EVENT_NOTIFIER_PREFIX), helixResourceManager);
        this._numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
        this._idealStateUpdateLocks = new Lock[this._numIdealStateUpdateLocks];
        for (int i = 0; i < this._numIdealStateUpdateLocks; ++i) {
            this._idealStateUpdateLocks[i] = new ReentrantLock();
        }
        this._flushThresholdUpdateManager = new FlushThresholdUpdateManager();
        this._isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
        this._isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled();
        this._deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
        this._fileUploadDownloadClient = this._isDeepStoreLLCSegmentUploadRetryEnabled ? this.initFileUploadDownloadClient() : null;
        this._deepStoreUploadExecutor = this._isDeepStoreLLCSegmentUploadRetryEnabled ? Executors.newFixedThreadPool(controllerConf.getDeepStoreRetryUploadParallelism()) : null;
        this._deepStoreUploadExecutorPendingSegments = this._isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
    }

    public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
        return this._isDeepStoreLLCSegmentUploadRetryEnabled;
    }

    public boolean isTmpSegmentAsyncDeletionEnabled() {
        return this._isTmpSegmentAsyncDeletionEnabled;
    }

    @VisibleForTesting
    FileUploadDownloadClient initFileUploadDownloadClient() {
        return new FileUploadDownloadClient();
    }

    public List<PartitionGroupConsumptionStatus> getPartitionGroupConsumptionStatusList(IdealState idealState, StreamConfig streamConfig) {
        int partitionGroupId;
        ArrayList<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList = new ArrayList<PartitionGroupConsumptionStatus>();
        HashMap<Integer, LLCSegmentName> partitionGroupIdToLatestSegment = new HashMap<Integer, LLCSegmentName>();
        for (String segment : idealState.getRecord().getMapFields().keySet()) {
            LLCSegmentName llcSegmentName = LLCSegmentName.of((String)segment);
            if (llcSegmentName == null) continue;
            partitionGroupId = llcSegmentName.getPartitionGroupId();
            partitionGroupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
                if (latestSegment == null) {
                    return llcSegmentName;
                }
                return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? latestSegment : llcSegmentName;
            });
        }
        StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.create((StreamConfig)streamConfig).createStreamMsgOffsetFactory();
        for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) {
            partitionGroupId = (Integer)entry.getKey();
            LLCSegmentName llcSegmentName = (LLCSegmentName)entry.getValue();
            SegmentZKMetadata segmentZKMetadata = this.getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName());
            PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), segmentZKMetadata.getEndOffset() == null ? null : offsetFactory.create(segmentZKMetadata.getEndOffset()), segmentZKMetadata.getStatus().toString());
            partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
        }
        return partitionGroupConsumptionStatusList;
    }

    public String getControllerVipUrl() {
        return this._controllerConf.generateVipUrl();
    }

    public void stop() {
        long thisWait;
        this._isStopping = true;
        LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", (Object)30000L);
        for (long millisToWait = 30000L; this._numCompletingSegments.get() > 0 && millisToWait > 0L; millisToWait -= thisWait) {
            try {
                thisWait = 1000L;
                if (millisToWait < thisWait) {
                    thisWait = millisToWait;
                }
                Thread.sleep(thisWait);
                continue;
            }
            catch (InterruptedException e) {
                LOGGER.info("Interrupted: Remaining wait time {} (out of {})", (Object)millisToWait, (Object)30000L);
                return;
            }
        }
        LOGGER.info("Wait completed: Number of completing segments = {}", (Object)this._numCompletingSegments.get());
        if (this._fileUploadDownloadClient != null) {
            try {
                this._fileUploadDownloadClient.close();
            }
            catch (IOException e) {
                LOGGER.error("Failed to close fileUploadDownloadClient.");
            }
        }
    }

    public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = tableConfig.getTableName();
        LOGGER.info("Setting up new LLC table: {}", (Object)realtimeTableName);
        this._flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
        StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig));
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        List<PartitionGroupMetadata> newPartitionGroupMetadataList = this.getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
        int numPartitionGroups = newPartitionGroupMetadataList.size();
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        long currentTimeMs = this.getCurrentTimeMs();
        Map instanceStatesMap = idealState.getRecord().getMapFields();
        for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
            String segmentName = this.setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
            this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap);
        }
        this.setIdealState(realtimeTableName, idealState);
    }

    @VisibleForTesting
    public TableConfig getTableConfig(String realtimeTableName) {
        TableConfig tableConfig;
        try {
            tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
        if (tableConfig == null) {
            throw new IllegalStateException("Failed to find table config for table: " + realtimeTableName);
        }
        return tableConfig;
    }

    @VisibleForTesting
    InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
        try {
            return InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixManager, (TableConfig)tableConfig, (InstancePartitionsType)InstancePartitionsType.CONSUMING);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getAllSegments(String realtimeTableName) {
        try {
            return ZKMetadataProvider.getSegments(this._propertyStore, (String)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getLLCSegments(String realtimeTableName) {
        try {
            return ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, (String)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    private SegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String segmentName) {
        return this.getSegmentZKMetadata(realtimeTableName, segmentName, null);
    }

    @VisibleForTesting
    SegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String segmentName, @Nullable Stat stat) {
        try {
            ZNRecord znRecord = (ZNRecord)this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment((String)realtimeTableName, (String)segmentName), stat, AccessOption.PERSISTENT);
            Preconditions.checkState((znRecord != null ? 1 : 0) != 0, (String)"Failed to find segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)realtimeTableName);
            return new SegmentZKMetadata(znRecord);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmentZKMetadata, int expectedVersion) {
        String segmentName = segmentZKMetadata.getSegmentName();
        LOGGER.info("Persisting segment ZK metadata for segment: {}", (Object)segmentName);
        try {
            Preconditions.checkState((boolean)this._propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment((String)realtimeTableName, (String)segmentName), (Object)segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT), (String)"Failed to persist segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    IdealState getIdealState(String realtimeTableName) {
        try {
            IdealState idealState = HelixHelper.getTableIdealState((HelixManager)this._helixManager, (String)realtimeTableName);
            Preconditions.checkState((idealState != null ? 1 : 0) != 0, (Object)("Failed to find IdealState for table: " + realtimeTableName));
            return idealState;
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void setIdealState(String realtimeTableName, IdealState idealState) {
        try {
            this._helixAdmin.setResourceIdealState(this._clusterName, realtimeTableName, idealState);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    public void commitSegmentFile(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) throws Exception {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment file for segment: {}", (Object)segmentName);
        String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
        Preconditions.checkArgument((segmentLocation != null ? 1 : 0) != 0, (Object)"Segment location must be provided");
        if (segmentLocation.regionMatches(true, 0, "peer://", 0, "peer://".length())) {
            LOGGER.info("No moving needed for segment on peer servers: {}", (Object)segmentLocation);
            return;
        }
        URI tableDirURI = URIUtils.getUri((String)this._controllerConf.getDataDir(), (String[])new String[]{rawTableName});
        PinotFS pinotFS = PinotFSFactory.create((String)tableDirURI.getScheme());
        String uriToMoveTo = this.moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS);
        if (!this.isTmpSegmentAsyncDeletionEnabled()) {
            try {
                for (String uri : pinotFS.listFiles(tableDirURI, false)) {
                    if (!uri.contains(SegmentCompletionUtils.getTmpSegmentNamePrefix((String)segmentName))) continue;
                    LOGGER.warn("Deleting temporary segment file: {}", (Object)uri);
                    Preconditions.checkState((boolean)pinotFS.delete(new URI(uri), true), (String)"Failed to delete file: %s", (Object)uri);
                }
            }
            catch (Exception e) {
                LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", (Object)segmentName, (Object)e);
            }
        }
        committingSegmentDescriptor.setSegmentLocation(uriToMoveTo);
    }

    public void commitSegmentMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        try {
            this._numCompletingSegments.addAndGet(1);
            this.commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor);
        }
        finally {
            this._numCompletingSegments.addAndGet(-1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitSegmentMetadataInternal(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String committingSegmentName = committingSegmentDescriptor.getSegmentName();
        LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
        int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();
        LOGGER.info("Committing segment metadata for segment: {}", (Object)committingSegmentName);
        if (StringUtils.isBlank((String)committingSegmentDescriptor.getSegmentLocation())) {
            LOGGER.warn("Committing segment: {} was not uploaded to deep store", (Object)committingSegmentName);
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1L);
        }
        TableConfig tableConfig = this.getTableConfig(realtimeTableName);
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        IdealState idealState = this.getIdealState(realtimeTableName);
        Preconditions.checkState((boolean)idealState.getInstanceStateMap(committingSegmentName).containsValue("CONSUMING"), (String)"Failed to find instance in CONSUMING state in IdealState for segment: %s", (Object)committingSegmentName);
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        long startTimeNs1 = System.nanoTime();
        SegmentZKMetadata committingSegmentZKMetadata = this.updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
        this._helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
        long startTimeNs2 = System.nanoTime();
        String newConsumingSegmentName = null;
        if (!this.isTablePaused(idealState)) {
            Set<Integer> partitionIds;
            StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig));
            try {
                partitionIds = this.getPartitionIds(streamConfig);
            }
            catch (Exception e) {
                LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. Reading all partition group metadata to determine partition ids.", (Object)realtimeTableName, (Object)e.toString());
                List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = this.getPartitionGroupConsumptionStatusList(idealState, streamConfig);
                List<PartitionGroupMetadata> newPartitionGroupMetadataList = this.getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
                partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
            }
            if (partitionIds.contains(committingSegmentPartitionGroupId)) {
                String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
                long newSegmentCreationTimeMs = this.getCurrentTimeMs();
                LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
                this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas);
                newConsumingSegmentName = newLLCSegment.getSegmentName();
            }
        }
        long startTimeNs3 = System.nanoTime();
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        int lockIndex = (realtimeTableName.hashCode() & Integer.MAX_VALUE) % this._numIdealStateUpdateLocks;
        Lock lock = this._idealStateUpdateLocks[lockIndex];
        try {
            lock.lock();
            this.updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, segmentAssignment, instancePartitionsMap);
        }
        finally {
            lock.unlock();
        }
        long endTimeNs = System.nanoTime();
        LOGGER.info("Finished committing segment metadata for segment: {}. Time taken for updating committing segment metadata: {}ms; creating new consuming segment ({}) metadata: {}ms; updating ideal state: {}ms; total: {}ms", new Object[]{committingSegmentName, TimeUnit.NANOSECONDS.toMillis(startTimeNs2 - startTimeNs1), newConsumingSegmentName, TimeUnit.NANOSECONDS.toMillis(startTimeNs3 - startTimeNs2), TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs3), TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs1)});
        this._metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
    }

    private SegmentZKMetadata updateCommittingSegmentZKMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Updating segment ZK metadata for committing segment: {}", (Object)segmentName);
        Stat stat = new Stat();
        SegmentZKMetadata committingSegmentZKMetadata = this.getSegmentZKMetadata(realtimeTableName, segmentName, stat);
        Preconditions.checkState((committingSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS ? 1 : 0) != 0, (String)"Segment status for segment: %s should be IN_PROGRESS, found: %s", (Object)segmentName, (Object)committingSegmentZKMetadata.getStatus());
        SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
        Preconditions.checkState((segmentMetadata != null ? 1 : 0) != 0, (String)"Failed to find segment metadata from descriptor for segment: %s", (Object)segmentName);
        committingSegmentZKMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
        committingSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
        committingSegmentZKMetadata.setDownloadUrl(this.isPeerURL(committingSegmentDescriptor.getSegmentLocation()) ? "" : committingSegmentDescriptor.getSegmentLocation());
        committingSegmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()).longValue());
        if (segmentMetadata.getTotalDocs() > 0) {
            Preconditions.checkNotNull((Object)segmentMetadata.getTimeInterval(), (Object)("start/end time information is not correctly written to the segment for table: " + realtimeTableName));
            committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
            committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
        } else {
            long now = System.currentTimeMillis();
            committingSegmentZKMetadata.setStartTime(now);
            committingSegmentZKMetadata.setEndTime(now);
        }
        committingSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
        SegmentVersion segmentVersion = segmentMetadata.getVersion();
        if (segmentVersion != null) {
            committingSegmentZKMetadata.setIndexVersion(segmentVersion.name());
        }
        committingSegmentZKMetadata.setTotalDocs((long)segmentMetadata.getTotalDocs());
        committingSegmentZKMetadata.setPartitionMetadata(this.getPartitionMetadataFromSegmentMetadata(segmentMetadata));
        this.persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, stat.getVersion());
        return committingSegmentZKMetadata;
    }

    private boolean isPeerURL(String segmentLocation) {
        return segmentLocation != null && segmentLocation.toLowerCase().startsWith("peer://");
    }

    private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig streamConfig, LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        String realtimeTableName = tableConfig.getTableName();
        String segmentName = newLLCSegmentName.getSegmentName();
        String startOffset = committingSegmentDescriptor.getNextOffset();
        LOGGER.info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", new Object[]{segmentName, startOffset, creationTimeMs});
        SegmentZKMetadata newSegmentZKMetadata = new SegmentZKMetadata(segmentName);
        newSegmentZKMetadata.setCreationTime(creationTimeMs);
        newSegmentZKMetadata.setStartOffset(startOffset);
        newSegmentZKMetadata.setNumReplicas(numReplicas);
        newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentPartitionMetadata partitionMetadata = this.getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId(), numPartitions);
        if (partitionMetadata != null) {
            newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
        }
        FlushThresholdUpdater flushThresholdUpdater = this._flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
        flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, committingSegmentZKMetadata, this.getMaxNumPartitionsPerInstance(instancePartitions, numPartitions, numReplicas));
        this.persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId, int numPartitionGroups) {
        SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
        if (partitionConfig == null) {
            return null;
        }
        Map columnPartitionMap = partitionConfig.getColumnPartitionMap();
        if (columnPartitionMap.size() == 1) {
            Map.Entry entry = columnPartitionMap.entrySet().iterator().next();
            ColumnPartitionConfig columnPartitionConfig = (ColumnPartitionConfig)entry.getValue();
            if (numPartitionGroups != columnPartitionConfig.getNumPartitions()) {
                LOGGER.warn("Number of partition groups fetched from the stream '{}' is different than columnPartitionConfig.numPartitions '{}' in the table config. The stream partition count is used. Please update the table config accordingly.", (Object)numPartitionGroups, (Object)columnPartitionConfig.getNumPartitions());
            }
            ColumnPartitionMetadata columnPartitionMetadata = new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), numPartitionGroups, Collections.singleton(partitionId), columnPartitionConfig.getFunctionConfig());
            return new SegmentPartitionMetadata(Collections.singletonMap((String)entry.getKey(), columnPartitionMetadata));
        }
        LOGGER.warn("Skip persisting partition metadata because there are other than exact one partition column for table: {}", (Object)tableConfig.getTableName());
        return null;
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
        for (Map.Entry entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
            ColumnMetadata columnMetadata = (ColumnMetadata)entry.getValue();
            PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
            if (partitionFunction == null) continue;
            ColumnPartitionMetadata columnPartitionMetadata = new ColumnPartitionMetadata(partitionFunction.getName(), partitionFunction.getNumPartitions(), columnMetadata.getPartitions(), columnMetadata.getPartitionFunction().getFunctionConfig());
            return new SegmentPartitionMetadata(Collections.singletonMap((String)entry.getKey(), columnPartitionMetadata));
        }
        return null;
    }

    public long getCommitTimeoutMS(String realtimeTableName) {
        long commitTimeoutMS = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
        if (this._propertyStore == null) {
            return commitTimeoutMS;
        }
        TableConfig tableConfig = this.getTableConfig(realtimeTableName);
        Map streamConfigs = IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig);
        if (streamConfigs.containsKey("realtime.segment.commit.timeoutSeconds")) {
            String commitTimeoutSecondsStr = (String)streamConfigs.get("realtime.segment.commit.timeoutSeconds");
            try {
                return TimeUnit.MILLISECONDS.convert(Integer.parseInt(commitTimeoutSecondsStr), TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to parse flush size of {}", (Object)commitTimeoutSecondsStr, (Object)e);
                return commitTimeoutMS;
            }
        }
        return commitTimeoutMS;
    }

    @VisibleForTesting
    Set<Integer> getPartitionIds(StreamConfig streamConfig) throws Exception {
        String clientId = PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + streamConfig.getTopicName();
        StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create((StreamConfig)streamConfig);
        try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId);){
            Set set = metadataProvider.fetchPartitionIds(5000L);
            return set;
        }
    }

    @VisibleForTesting
    List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) {
        return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
    }

    public void segmentStoppedConsuming(LLCSegmentName llcSegmentName, String instanceName) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
        String segmentName = llcSegmentName.getSegmentName();
        LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", (Object)segmentName, (Object)instanceName);
        try {
            HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
                assert (idealState != null);
                Map stateMap = idealState.getInstanceStateMap(segmentName);
                String state = (String)stateMap.get(instanceName);
                if ("CONSUMING".equals(state)) {
                    stateMap.put(instanceName, "OFFLINE");
                } else {
                    LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", new Object[]{segmentName, state, instanceName});
                }
                return idealState;
            }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)500L, (double)1.2f), (boolean)true);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
        try {
            this._helixAdmin.resetPartition(this._helixManager.getClusterName(), instanceName, realtimeTableName, Collections.singletonList(segmentName));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String realtimeTableName) {
        List<String> segments = this.getLLCSegments(realtimeTableName);
        HashMap<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<Integer, LLCSegmentName>();
        for (String segmentName : segments) {
            LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
            latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
                if (latestLLCSegmentName == null) {
                    return llcSegmentName;
                }
                if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
                    return llcSegmentName;
                }
                return latestLLCSegmentName;
            });
        }
        HashMap<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = new HashMap<Integer, SegmentZKMetadata>();
        for (Map.Entry entry : latestLLCSegmentNameMap.entrySet()) {
            SegmentZKMetadata latestSegmentZKMetadata = this.getSegmentZKMetadata(realtimeTableName, ((LLCSegmentName)entry.getValue()).getSegmentName());
            latestSegmentZKMetadataMap.put((Integer)entry.getKey(), latestSegmentZKMetadata);
        }
        return latestSegmentZKMetadataMap;
    }

    public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = tableConfig.getTableName();
        HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
            boolean offsetsHaveToChange;
            assert (idealState != null);
            boolean isTableEnabled = idealState.isEnabled();
            boolean isTablePaused = this.isTablePaused((IdealState)idealState);
            boolean bl = offsetsHaveToChange = offsetCriteria != null;
            if (isTableEnabled && !isTablePaused) {
                List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = offsetsHaveToChange ? Collections.emptyList() : this.getPartitionGroupConsumptionStatusList((IdealState)idealState, streamConfig);
                OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
                streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
                List<PartitionGroupMetadata> newPartitionGroupMetadataList = this.getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
                streamConfig.setOffsetCriteria(originalOffsetCriteria);
                return this.ensureAllPartitionsConsuming(tableConfig, streamConfig, (IdealState)idealState, newPartitionGroupMetadataList, recreateDeletedConsumingSegment, offsetCriteria);
            }
            LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", new Object[]{realtimeTableName, isTableEnabled, isTablePaused});
            return idealState;
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)1000L, (double)1.2f), (boolean)true);
    }

    @VisibleForTesting
    void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
            assert (idealState != null);
            if (this.isExceededMaxSegmentCompletionTime(realtimeTableName, committingSegmentName, this.getCurrentTimeMs())) {
                LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment: {}", (Object)committingSegmentName);
                throw new HelixHelper.PermanentUpdaterException("Exceeded max segment completion time for segment " + committingSegmentName);
            }
            this.updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, this.isTablePaused((IdealState)idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
            return idealState;
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)1000L, (double)1.2f));
    }

    private boolean isTablePaused(IdealState idealState) {
        return Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
    }

    @VisibleForTesting
    void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        if (committingSegmentName != null) {
            Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
            instanceStatesMap.put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, "ONLINE"));
            LOGGER.info("Updating segment: {} to ONLINE state", (Object)committingSegmentName);
        }
        if (newSegmentName != null) {
            LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
            int partitionId = newLLCSegmentName.getPartitionGroupId();
            int seqNum = newLLCSegmentName.getSequenceNumber();
            for (String segmentNameStr : instanceStatesMap.keySet()) {
                LLCSegmentName llcSegmentName = LLCSegmentName.of((String)segmentNameStr);
                if (llcSegmentName == null) {
                    LOGGER.debug("Skip segment name {} not in low-level consumer format", (Object)segmentNameStr);
                    continue;
                }
                if (llcSegmentName.getPartitionGroupId() != partitionId || llcSegmentName.getSequenceNumber() != seqNum) continue;
                String errorMsg = String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr);
                LOGGER.error(errorMsg);
                throw new HelixHelper.PermanentUpdaterException(errorMsg);
            }
            List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
            instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "CONSUMING"));
            LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", (Object)newSegmentName, instancesAssigned);
        }
    }

    @VisibleForTesting
    boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) {
        Stat stat = new Stat();
        this.getSegmentZKMetadata(realtimeTableName, segmentName, stat);
        if (currentTimeMs > stat.getMtime() + 300000L) {
            LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", new Object[]{segmentName, 300000L, stat.getMtime(), currentTimeMs});
            return true;
        }
        return false;
    }

    private boolean isAllInstancesInState(Map<String, String> instanceStateMap, String state) {
        for (String value : instanceStateMap.values()) {
            if (value.equals(state)) continue;
            return false;
        }
        return true;
    }

    @VisibleForTesting
    IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList, boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
        int partitionGroupId;
        String realtimeTableName = tableConfig.getTableName();
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        int numPartitions = newPartitionGroupMetadataList.size();
        Set newPartitionGroupSet = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        Map instanceStatesMap = idealState.getRecord().getMapFields();
        long currentTimeMs = this.getCurrentTimeMs();
        StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.create((StreamConfig)streamConfig).createStreamMsgOffsetFactory();
        Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = this.getLatestSegmentZKMetadataMap(realtimeTableName);
        HashMap<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<Integer, StreamPartitionMsgOffset>();
        for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) {
            partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
        }
        Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null;
        if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
            partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset;
        }
        for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
            partitionGroupId = entry.getKey();
            SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
            String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
            LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);
            Map instanceStateMap = (Map)instanceStatesMap.get(latestSegmentName);
            if (instanceStateMap != null) {
                StreamPartitionMsgOffset startOffset;
                if (instanceStateMap.containsValue("CONSUMING")) {
                    if (latestSegmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE || !this.isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) continue;
                    if (newPartitionGroupSet.contains(partitionGroupId)) {
                        LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", (Object)latestSegmentName);
                        LLCSegmentName newLLCSegmentName = this.getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
                        String newSegmentName = newLLCSegmentName.getSegmentName();
                        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString(), 0L);
                        this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
                        this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap);
                        continue;
                    }
                    LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. Skipping creation of new ZK metadata and new segment in ideal state", (Object)partitionGroupId, (Object)latestSegmentName);
                    this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment, instancePartitionsMap);
                    continue;
                }
                if (this.isAllInstancesInState(instanceStateMap, "OFFLINE")) {
                    LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", (Object)latestSegmentName);
                    if (partitionGroupIdToSmallestStreamOffset == null) {
                        partitionGroupIdToSmallestStreamOffset = this.fetchPartitionGroupIdToSmallestOffset(streamConfig);
                    }
                    startOffset = this.selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset());
                    this.createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset);
                    continue;
                }
                if (!newPartitionGroupSet.contains(partitionGroupId)) continue;
                if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() && this.isAllInstancesInState(instanceStateMap, "ONLINE")) {
                    if (partitionGroupIdToSmallestStreamOffset == null) {
                        partitionGroupIdToSmallestStreamOffset = this.fetchPartitionGroupIdToSmallestOffset(streamConfig);
                    }
                    startOffset = this.selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset());
                    this.createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset);
                    continue;
                }
                LOGGER.error("Got unexpected instance state map: {} for segment: {}", (Object)instanceStateMap, (Object)latestSegmentName);
                continue;
            }
            if (!this.isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) continue;
            LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState", (Object)latestSegmentName);
            if (latestSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                String previousConsumingSegment = null;
                for (Map.Entry segmentEntry : instanceStatesMap.entrySet()) {
                    if (!((Map)segmentEntry.getValue()).containsValue("CONSUMING") || new LLCSegmentName((String)segmentEntry.getKey()).getPartitionGroupId() != partitionGroupId) continue;
                    previousConsumingSegment = (String)segmentEntry.getKey();
                    break;
                }
                if (previousConsumingSegment == null) {
                    LOGGER.error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", (Object)partitionGroupId, (Object)realtimeTableName);
                    this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                }
                this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName, segmentAssignment, instancePartitionsMap);
                continue;
            }
            LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", (Object)latestSegmentZKMetadata.getStatus(), (Object)latestSegmentName);
        }
        for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
            partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
            if (latestSegmentZKMetadataMap.containsKey(partitionGroupId)) continue;
            String newSegmentName = this.setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions, numReplicas);
            this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap);
        }
        return idealState;
    }

    private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig, SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        int numPartitions = newPartitionGroupMetadataList.size();
        LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
        LLCSegmentName newLLCSegmentName = this.getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0L);
        this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
        String newSegmentName = newLLCSegmentName.getSegmentName();
        this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap);
    }

    private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
        OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
        streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
        List<PartitionGroupMetadata> partitionGroupMetadataList = this.getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
        streamConfig.setOffsetCriteria(originalOffsetCriteria);
        HashMap<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<Integer, StreamPartitionMsgOffset>();
        for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
            partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
        }
        return partitionGroupIdToSmallestOffset;
    }

    private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId, Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset, Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName, StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) {
        if (offsetCriteria != null) {
            return partitionGroupIdToStartOffset.get(partitionGroupId);
        }
        StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr);
        StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId);
        if (streamSmallestOffset.compareTo((Object)startOffsetInSegmentZkMetadata) > 0) {
            LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", new Object[]{startOffsetInSegmentZkMetadata, streamSmallestOffset, partitionGroupId, tableName});
            this._controllerMetrics.addMeteredTableValue(tableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
            return streamSmallestOffset;
        }
        return startOffsetInSegmentZkMetadata;
    }

    private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
        return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(), lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
    }

    private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        String realtimeTableName = tableConfig.getTableName();
        int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
        String startOffset = partitionGroupMetadata.getStartOffset().toString();
        LOGGER.info("Setting up new partition group: {} for table: {}", (Object)partitionGroupId, (Object)realtimeTableName);
        String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
        LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, partitionGroupId, 0, creationTimeMs);
        String newSegmentName = newLLCSegmentName.getSegmentName();
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0L);
        this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
        return newSegmentName;
    }

    @VisibleForTesting
    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            return tableConfig.getReplication();
        }
        return instancePartitions.getNumReplicaGroups();
    }

    private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            int numInstances = instancePartitions.getInstances(0, 0).size();
            return (numPartitions * numReplicas + numInstances - 1) / numInstances;
        }
        int numInstancesPerReplicaGroup = instancePartitions.getInstances(0, 0).size();
        return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
    }

    public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMetadata> segmentsZKMetadata) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = tableConfig.getTableName();
        String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        if (validationConfig.getRetentionTimeUnit() == null || validationConfig.getRetentionTimeUnit().isEmpty() || validationConfig.getRetentionTimeValue() == null || validationConfig.getRetentionTimeValue().isEmpty()) {
            return;
        }
        long retentionMs = TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase()).toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
        TimeRetentionStrategy retentionStrategy = new TimeRetentionStrategy(TimeUnit.MILLISECONDS, retentionMs - 3600000L);
        PinotFS pinotFS = PinotFSFactory.create((String)URIUtils.getUri((String)this._controllerConf.getDataDir()).getScheme());
        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
            String segmentName = segmentZKMetadata.getSegmentName();
            try {
                if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE || !"".equals(segmentZKMetadata.getDownloadUrl())) continue;
                if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
                    LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", (Object)segmentName);
                    continue;
                }
            }
            catch (Exception e) {
                LOGGER.warn("Failed checking segment deep store URL for segment {}", (Object)segmentName);
            }
            if (!this._deepStoreUploadExecutorPendingSegments.add(segmentName)) {
                int queueSize = this._deepStoreUploadExecutorPendingSegments.size();
                this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), (long)queueSize);
                continue;
            }
            Runnable uploadRunnable = () -> {
                try {
                    LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", (Object)segmentName);
                    List peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs((String)segmentName, (String)"http", (HelixManager)this._helixManager);
                    if (peerSegmentURIs.isEmpty()) {
                        throw new IllegalStateException(String.format("Failed to upload segment %s to deep store because no online replica is found", segmentName));
                    }
                    URI uri = (URI)peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
                    String serverUploadRequestUrl = StringUtil.join((String)"/", (String[])new String[]{uri.toString(), "upload"});
                    serverUploadRequestUrl = String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, this._deepstoreUploadRetryTimeoutMs);
                    LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", (Object)segmentName, (Object)serverUploadRequestUrl);
                    String tempSegmentDownloadUrl = this._fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
                    String segmentDownloadUrl = this.moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
                    LOGGER.info("Updating segment {} download url in ZK to be {}", (Object)segmentName, (Object)segmentDownloadUrl);
                    segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
                    this.persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
                    LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", (Object)segmentName, (Object)segmentDownloadUrl);
                    this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
                }
                catch (Exception e) {
                    this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
                    LOGGER.error("Failed to upload segment {} to deep store", (Object)segmentName, (Object)e);
                }
                finally {
                    this._deepStoreUploadExecutorPendingSegments.remove(segmentName);
                    int queueSize = this._deepStoreUploadExecutorPendingSegments.size();
                    this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), (long)queueSize);
                }
            };
            this._deepStoreUploadExecutor.submit(uploadRunnable);
        }
    }

    @VisibleForTesting
    boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
        return this._deepStoreUploadExecutorPendingSegments.isEmpty();
    }

    public long deleteTmpSegments(String tableNameWithType, List<SegmentZKMetadata> segmentsZKMetadata) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        if (!TableNameBuilder.isRealtimeTableResource((String)tableNameWithType)) {
            return 0L;
        }
        TableConfig tableConfig = this._helixResourceManager.getTableConfig(tableNameWithType);
        if (tableConfig == null) {
            LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", (Object)tableNameWithType);
            return 0L;
        }
        if (!this.isTmpSegmentAsyncDeletionEnabled()) {
            return 0L;
        }
        Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> meta.getStatus() == CommonConstants.Segment.Realtime.Status.DONE && !"".equals(meta.getDownloadUrl())).map(SegmentZKMetadata::getDownloadUrl).collect(Collectors.toSet());
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableNameWithType);
        URI tableDirURI = URIUtils.getUri((String)this._controllerConf.getDataDir(), (String[])new String[]{rawTableName});
        PinotFS pinotFS = PinotFSFactory.create((String)tableDirURI.getScheme());
        long deletedTmpSegments = 0L;
        try {
            for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
                URI uri = URIUtils.getUri((String)filePath);
                if (!this.isTmpAndCanDelete(uri, deepURIs, pinotFS)) continue;
                LOGGER.info("Deleting temporary segment file: {}", (Object)uri);
                if (pinotFS.delete(uri, true)) {
                    LOGGER.info("Succeed to delete file: {}", (Object)uri);
                    ++deletedTmpSegments;
                    continue;
                }
                LOGGER.warn("Failed to delete file: {}", (Object)uri);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while deleting temporary files for table: {}", (Object)rawTableName, (Object)e);
        }
        return deletedTmpSegments;
    }

    private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS pinotFS) throws Exception {
        long lastModified = pinotFS.lastModified(uri);
        if (lastModified <= 0L) {
            LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", (Object)uri.toString(), (Object)lastModified);
            return false;
        }
        String uriString = uri.toString();
        return SegmentCompletionUtils.isTmpFile((String)uriString) && !deepURIs.contains(uriString) && this.getCurrentTimeMs() - lastModified > (long)this._controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
    }

    public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, @Nullable String segmentsToCommit) {
        IdealState idealState = this.getIdealState(tableNameWithType);
        Set<String> allConsumingSegments = this.findConsumingSegments(idealState);
        Set<String> targetConsumingSegments = this.filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit);
        this.sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);
        return targetConsumingSegments;
    }

    private Set<String> filterSegmentsToCommit(Set<String> allConsumingSegments, @Nullable String partitionGroupIdsToCommitStr, @Nullable String segmentsToCommitStr) {
        if (partitionGroupIdsToCommitStr == null && segmentsToCommitStr == null) {
            return allConsumingSegments;
        }
        if (segmentsToCommitStr != null) {
            Set<String> segmentsToCommit = Arrays.stream(segmentsToCommitStr.split(",")).map(String::trim).collect(Collectors.toSet());
            Preconditions.checkState((boolean)allConsumingSegments.containsAll(segmentsToCommit), (String)"Cannot commit segments that are not in CONSUMING state. All consuming segments: %s, provided segments to commit: %s", allConsumingSegments, (Object)segmentsToCommitStr);
            return segmentsToCommit;
        }
        Set partitionsToCommit = Arrays.stream(partitionGroupIdsToCommitStr.split(",")).map(String::trim).map(Integer::parseInt).collect(Collectors.toSet());
        Set<String> targetSegments = allConsumingSegments.stream().filter(segmentName -> partitionsToCommit.contains(new LLCSegmentName(segmentName).getPartitionGroupId())).collect(Collectors.toSet());
        Preconditions.checkState((!targetSegments.isEmpty() ? 1 : 0) != 0, (String)"Cannot find segments to commit for partitions: %s", (Object)partitionGroupIdsToCommitStr);
        return targetSegments;
    }

    public PauseStatus pauseConsumption(String tableNameWithType) {
        IdealState updatedIdealState = this.updatePauseStatusInIdealState(tableNameWithType, true);
        Set<String> consumingSegments = this.findConsumingSegments(updatedIdealState);
        this.sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
        return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set. Consuming segments are being committed. Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.");
    }

    public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) {
        IdealState updatedIdealState = this.updatePauseStatusInIdealState(tableNameWithType, false);
        HashMap<String, String> taskProperties = new HashMap<String, String>();
        taskProperties.put("recreateDeletedConsumingSegment", "true");
        if (offsetCriteria != null) {
            taskProperties.put("offsetCriteria", offsetCriteria);
        }
        this._helixResourceManager.invokeControllerPeriodicTask(tableNameWithType, "RealtimeSegmentValidationManager", taskProperties);
        return new PauseStatus(false, this.findConsumingSegments(updatedIdealState), "Pause flag is cleared. Consuming segments are being created. Use /pauseStatus endpoint in a few moments to double check.");
    }

    private IdealState updatePauseStatusInIdealState(String tableNameWithType, boolean pause) {
        IdealState updatedIdealState = HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)tableNameWithType, idealState -> {
            ZNRecord znRecord = idealState.getRecord();
            znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
            return new IdealState(znRecord);
        }, (RetryPolicy)RetryPolicies.noDelayRetryPolicy((int)1));
        LOGGER.info("Set 'isTablePaused' to {} in the Ideal State for table {}.", (Object)pause, (Object)tableNameWithType);
        return updatedIdealState;
    }

    private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
        if (!consumingSegments.isEmpty()) {
            Criteria recipientCriteria = new Criteria();
            recipientCriteria.setInstanceName("%");
            recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
            recipientCriteria.setResource(tableNameWithType);
            recipientCriteria.setSessionSpecific(true);
            ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
            int numMessagesSent = this._helixManager.getMessagingService().send(recipientCriteria, (Message)message, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent {} force commit messages for table: {} segments: {}", new Object[]{numMessagesSent, tableNameWithType, consumingSegments});
            } else {
                throw new RuntimeException(String.format("No force commit message was sent for table: %s segments: %s", tableNameWithType, consumingSegments));
            }
        }
    }

    private Set<String> findConsumingSegments(IdealState idealState) {
        TreeSet<String> consumingSegments = new TreeSet<String>();
        idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> {
            for (String state : instanceToStateMap.values()) {
                if (!state.equals("CONSUMING")) continue;
                consumingSegments.add((String)segmentName);
                break;
            }
        });
        return consumingSegments;
    }

    public PauseStatus getPauseStatus(String tableNameWithType) {
        IdealState idealState = this.getIdealState(tableNameWithType);
        String isTablePausedStr = idealState.getRecord().getSimpleField(IS_TABLE_PAUSED);
        Set<String> consumingSegments = this.findConsumingSegments(idealState);
        return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null);
    }

    @VisibleForTesting
    String moveSegmentFile(String rawTableName, String segmentName, String segmentLocation, PinotFS pinotFS) throws IOException {
        URI segmentFileURI = URIUtils.getUri((String)segmentLocation);
        URI uriToMoveTo = this.createSegmentPath(rawTableName, segmentName);
        Preconditions.checkState((boolean)pinotFS.move(segmentFileURI, uriToMoveTo, true), (String)"Failed to move segment file for segment: %s from: %s to: %s", (Object)segmentName, (Object)segmentLocation, (Object)uriToMoveTo);
        return uriToMoveTo.toString();
    }

    @VisibleForTesting
    URI createSegmentPath(String rawTableName, String segmentName) {
        return URIUtils.getUri((String)this._controllerConf.getDataDir(), (String[])new String[]{rawTableName, URIUtils.encode((String)segmentName)});
    }
}

