/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsType;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.TableConfigCache;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotLLCRealtimeSegmentManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
    private static final int STARTING_SEQUENCE_NUMBER = 0;
    private static final long END_OFFSET_FOR_CONSUMING_SEGMENTS = Long.MAX_VALUE;
    private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
    private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30000L;
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300000L;
    private final HelixAdmin _helixAdmin;
    private final HelixManager _helixManager;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final PinotHelixResourceManager _helixResourceManager;
    private final String _clusterName;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;
    private final MetadataEventNotifierFactory _metadataEventNotifierFactory;
    private final int _numIdealStateUpdateLocks;
    private final Lock[] _idealStateUpdateLocks;
    private final TableConfigCache _tableConfigCache;
    private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
    private volatile boolean _isStopping = false;
    private AtomicInteger _numCompletingSegments = new AtomicInteger(0);

    public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._helixAdmin = helixResourceManager.getHelixAdmin();
        this._helixManager = helixResourceManager.getHelixZkManager();
        this._propertyStore = helixResourceManager.getPropertyStore();
        this._helixResourceManager = helixResourceManager;
        this._clusterName = helixResourceManager.getHelixClusterName();
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
        this._metadataEventNotifierFactory = MetadataEventNotifierFactory.loadFactory(controllerConf.subset(METADATA_EVENT_NOTIFIER_PREFIX));
        this._numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
        this._idealStateUpdateLocks = new Lock[this._numIdealStateUpdateLocks];
        for (int i = 0; i < this._numIdealStateUpdateLocks; ++i) {
            this._idealStateUpdateLocks[i] = new ReentrantLock();
        }
        this._tableConfigCache = new TableConfigCache(this._propertyStore);
        this._flushThresholdUpdateManager = new FlushThresholdUpdateManager();
    }

    public boolean getIsSplitCommitEnabled() {
        return this._controllerConf.getAcceptSplitCommit();
    }

    public String getControllerVipUrl() {
        return this._controllerConf.generateVipUrl();
    }

    public void stop() {
        long thisWait;
        this._isStopping = true;
        LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", (Object)30000L);
        for (long millisToWait = 30000L; this._numCompletingSegments.get() > 0 && millisToWait > 0L; millisToWait -= thisWait) {
            try {
                thisWait = 1000L;
                if (millisToWait < thisWait) {
                    thisWait = millisToWait;
                }
                Thread.sleep(thisWait);
                continue;
            }
            catch (InterruptedException e) {
                LOGGER.info("Interrupted: Remaining wait time {} (out of {})", (Object)millisToWait, (Object)30000L);
                return;
            }
        }
        LOGGER.info("Wait completed: Number of completing segments = {}", (Object)this._numCompletingSegments.get());
    }

    public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = tableConfig.getTableName();
        LOGGER.info("Setting up new LLC table: {}", (Object)realtimeTableName);
        List<String> currentSegments = this.getAllSegments(realtimeTableName);
        for (String segmentName : currentSegments) {
            Preconditions.checkState((boolean)SegmentName.isHighLevelConsumerSegmentName((String)segmentName), (String)"Cannot set up new LLC table: %s with existing non-HLC segment: %s", (Object)realtimeTableName, (Object)segmentName);
        }
        this._flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
        PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs());
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        int numPartitions = this.getNumPartitions((StreamConfig)streamConfig);
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        long currentTimeMs = this.getCurrentTimeMs();
        Map instanceStatesMap = idealState.getRecord().getMapFields();
        for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
            String segmentName = this.setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, numReplicas);
            this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap);
        }
        this.setIdealState(realtimeTableName, idealState);
    }

    public void removeLLCSegments(IdealState idealState) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = idealState.getResourceName();
        LOGGER.info("Removing LLC segments for table: {}", (Object)realtimeTableName);
        ArrayList<String> segmentsToRemove = new ArrayList<String>();
        for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
            if (!SegmentName.isLowLevelConsumerSegmentName((String)segmentName)) continue;
            segmentsToRemove.add(segmentName);
        }
        this._helixResourceManager.deleteSegments(realtimeTableName, segmentsToRemove);
    }

    @VisibleForTesting
    TableConfig getTableConfig(String realtimeTableName) {
        try {
            return this._tableConfigCache.getTableConfig(realtimeTableName);
        }
        catch (ExecutionException e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw new IllegalStateException("Caught exception while loading table config from property store to cache for table: " + realtimeTableName, e);
        }
    }

    @VisibleForTesting
    InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
        try {
            return InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixManager, (TableConfig)tableConfig, (InstancePartitionsType)InstancePartitionsType.CONSUMING);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getAllSegments(String realtimeTableName) {
        try {
            return ZKMetadataProvider.getSegments(this._propertyStore, (String)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getLLCSegments(String realtimeTableName) {
        try {
            return ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, (String)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    private LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String segmentName) {
        return this.getSegmentZKMetadata(realtimeTableName, segmentName, null);
    }

    @VisibleForTesting
    LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String segmentName, @Nullable Stat stat) {
        try {
            ZNRecord znRecord = (ZNRecord)this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment((String)realtimeTableName, (String)segmentName), stat, AccessOption.PERSISTENT);
            Preconditions.checkState((znRecord != null ? 1 : 0) != 0, (String)"Failed to find segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)realtimeTableName);
            return new LLCRealtimeSegmentZKMetadata(znRecord);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void persistSegmentZKMetadata(String realtimeTableName, LLCRealtimeSegmentZKMetadata segmentZKMetadata, int expectedVersion) {
        String segmentName = segmentZKMetadata.getSegmentName();
        LOGGER.info("Persisting segment ZK metadata for segment: {}", (Object)segmentName);
        try {
            Preconditions.checkState((boolean)this._propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment((String)realtimeTableName, (String)segmentName), (Object)segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT), (String)"Failed to persist segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)realtimeTableName);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    IdealState getIdealState(String realtimeTableName) {
        try {
            IdealState idealState = HelixHelper.getTableIdealState((HelixManager)this._helixManager, (String)realtimeTableName);
            Preconditions.checkState((idealState != null ? 1 : 0) != 0, (Object)("Failed to find IdealState for table: " + realtimeTableName));
            return idealState;
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void setIdealState(String realtimeTableName, IdealState idealState) {
        try {
            this._helixAdmin.setResourceIdealState(this._clusterName, realtimeTableName, idealState);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    public void commitSegmentFile(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) throws Exception {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment file for segment: {}", (Object)segmentName);
        String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
        URI segmentFileURI = URIUtils.getUri((String)segmentLocation, (String[])new String[0]);
        URI tableDirURI = URIUtils.getUri((String)this._controllerConf.getDataDir(), (String[])new String[]{rawTableName});
        URI uriToMoveTo = URIUtils.getUri((String)this._controllerConf.getDataDir(), (String[])new String[]{rawTableName, URIUtils.encode((String)segmentName)});
        PinotFS pinotFS = PinotFSFactory.create((String)tableDirURI.getScheme());
        Preconditions.checkState((boolean)pinotFS.move(segmentFileURI, uriToMoveTo, true), (String)"Failed to move segment file for segment: %s from: %s to: %s", (Object)segmentName, (Object)segmentLocation, (Object)uriToMoveTo);
        try {
            for (String uri : pinotFS.listFiles(tableDirURI, false)) {
                if (!uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) continue;
                LOGGER.warn("Deleting temporary segment file: {}", (Object)uri);
                Preconditions.checkState((boolean)pinotFS.delete(new URI(uri), true), (String)"Failed to delete file: %s", (Object)uri);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", (Object)segmentName, (Object)e);
        }
    }

    public void commitSegmentMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        try {
            this._numCompletingSegments.addAndGet(1);
            this.commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor);
        }
        finally {
            this._numCompletingSegments.addAndGet(-1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitSegmentMetadataInternal(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String committingSegmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment metadata for segment: {}", (Object)committingSegmentName);
        TableConfig tableConfig = this.getTableConfig(realtimeTableName);
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        IdealState idealState = this.getIdealState(realtimeTableName);
        Preconditions.checkState((boolean)idealState.getInstanceStateMap(committingSegmentName).containsValue("CONSUMING"), (String)"Failed to find instance in CONSUMING state in IdealState for segment: %s", (Object)committingSegmentName);
        int numPartitions = this.getNumPartitionsFromIdealState(idealState);
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata = this.updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
        long newSegmentCreationTimeMs = this.getCurrentTimeMs();
        LLCSegmentName newLLCSegmentName = this.getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs);
        this.createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs()), newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        int lockIndex = (realtimeTableName.hashCode() & Integer.MAX_VALUE) % this._numIdealStateUpdateLocks;
        Lock lock = this._idealStateUpdateLocks[lockIndex];
        try {
            lock.lock();
            this.updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newLLCSegmentName.getSegmentName(), segmentAssignment, instancePartitionsMap);
        }
        finally {
            lock.unlock();
        }
        this._metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
    }

    private LLCRealtimeSegmentZKMetadata updateCommittingSegmentZKMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Updating segment ZK metadata for committing segment: {}", (Object)segmentName);
        Stat stat = new Stat();
        LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata = this.getSegmentZKMetadata(realtimeTableName, segmentName, stat);
        Preconditions.checkState((committingSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS ? 1 : 0) != 0, (String)"Segment status for segment: %s should be IN_PROGRESS, found: %s", (Object)segmentName, (Object)committingSegmentZKMetadata.getStatus());
        SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
        Preconditions.checkState((segmentMetadata != null ? 1 : 0) != 0, (String)"Failed to find segment metadata from descriptor for segment: %s", (Object)segmentName);
        committingSegmentZKMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
        committingSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
        committingSegmentZKMetadata.setDownloadUrl(URIUtils.constructDownloadUrl((String)this._controllerConf.generateVipUrl(), (String)TableNameBuilder.extractRawTableName((String)realtimeTableName), (String)segmentName));
        committingSegmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()).longValue());
        committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
        committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
        committingSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
        committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
        committingSegmentZKMetadata.setTotalDocs((long)segmentMetadata.getTotalDocs());
        this.persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, stat.getVersion());
        return committingSegmentZKMetadata;
    }

    private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        String realtimeTableName = tableConfig.getTableName();
        String segmentName = newLLCSegmentName.getSegmentName();
        long startOffset = committingSegmentDescriptor.getNextOffset();
        LOGGER.info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", new Object[]{segmentName, startOffset, creationTimeMs});
        LLCRealtimeSegmentZKMetadata newSegmentZKMetadata = new LLCRealtimeSegmentZKMetadata();
        newSegmentZKMetadata.setTableName(realtimeTableName);
        newSegmentZKMetadata.setSegmentName(segmentName);
        newSegmentZKMetadata.setCreationTime(creationTimeMs);
        newSegmentZKMetadata.setStartOffset(startOffset);
        newSegmentZKMetadata.setEndOffset(Long.MAX_VALUE);
        newSegmentZKMetadata.setNumReplicas(numReplicas);
        newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentPartitionMetadata partitionMetadata = this.getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId());
        if (partitionMetadata != null) {
            newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
        }
        FlushThresholdUpdater flushThresholdUpdater = this._flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
        flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, committingSegmentZKMetadata, this.getMaxNumPartitionsPerInstance(instancePartitions, numPartitions, numReplicas));
        this.persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) {
        SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
        if (partitionConfig == null) {
            return null;
        }
        TreeMap<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<String, ColumnPartitionMetadata>();
        for (Map.Entry entry : partitionConfig.getColumnPartitionMap().entrySet()) {
            String columnName = (String)entry.getKey();
            ColumnPartitionConfig columnPartitionConfig = (ColumnPartitionConfig)entry.getValue();
            partitionMetadataMap.put(columnName, new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), columnPartitionConfig.getNumPartitions(), Collections.singleton(partitionId)));
        }
        return new SegmentPartitionMetadata(partitionMetadataMap);
    }

    public long getCommitTimeoutMS(String realtimeTableName) {
        long commitTimeoutMS = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
        if (this._propertyStore == null) {
            return commitTimeoutMS;
        }
        TableConfig tableConfig = this.getTableConfig(realtimeTableName);
        Map streamConfigs = tableConfig.getIndexingConfig().getStreamConfigs();
        if (streamConfigs != null && streamConfigs.containsKey("realtime.segment.commit.timeoutSeconds")) {
            String commitTimeoutSecondsStr = (String)streamConfigs.get("realtime.segment.commit.timeoutSeconds");
            try {
                return TimeUnit.MILLISECONDS.convert(Integer.parseInt(commitTimeoutSecondsStr), TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to parse flush size of {}", (Object)commitTimeoutSecondsStr, (Object)e);
                return commitTimeoutMS;
            }
        }
        return commitTimeoutMS;
    }

    @VisibleForTesting
    int getNumPartitions(StreamConfig streamConfig) {
        return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
    }

    @VisibleForTesting
    long getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionId) {
        PartitionOffsetFetcher partitionOffsetFetcher = new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig);
        try {
            RetryPolicies.fixedDelayRetryPolicy((int)3, (long)1000L).attempt((Callable)partitionOffsetFetcher);
            return partitionOffsetFetcher.getOffset();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s", streamConfig.getTopicName(), partitionId, offsetCriteria));
        }
    }

    public void segmentStoppedConsuming(LLCSegmentName llcSegmentName, String instanceName) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
        String segmentName = llcSegmentName.getSegmentName();
        LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", (Object)segmentName, (Object)instanceName);
        try {
            HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
                assert (idealState != null);
                Map stateMap = idealState.getInstanceStateMap(segmentName);
                String state = (String)stateMap.get(instanceName);
                if ("CONSUMING".equals(state)) {
                    stateMap.put(instanceName, "OFFLINE");
                } else {
                    LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", new Object[]{segmentName, state, instanceName});
                }
                return idealState;
            }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)500L, (double)1.2f), (boolean)true);
        }
        catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    private Map<Integer, LLCRealtimeSegmentZKMetadata> getLatestSegmentZKMetadataMap(String realtimeTableName) {
        List<String> segments = this.getLLCSegments(realtimeTableName);
        HashMap<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<Integer, LLCSegmentName>();
        for (String segmentName : segments) {
            LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
            latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
                if (latestLLCSegmentName == null) {
                    return llcSegmentName;
                }
                if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
                    return llcSegmentName;
                }
                return latestLLCSegmentName;
            });
        }
        HashMap<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = new HashMap<Integer, LLCRealtimeSegmentZKMetadata>();
        for (Map.Entry entry : latestLLCSegmentNameMap.entrySet()) {
            LLCRealtimeSegmentZKMetadata latestSegmentZKMetadata = this.getSegmentZKMetadata(realtimeTableName, ((LLCSegmentName)entry.getValue()).getSegmentName());
            latestSegmentZKMetadataMap.put((Integer)entry.getKey(), latestSegmentZKMetadata);
        }
        return latestSegmentZKMetadataMap;
    }

    public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig) {
        Preconditions.checkState((!this._isStopping ? 1 : 0) != 0, (Object)"Segment manager is stopping");
        String realtimeTableName = tableConfig.getTableName();
        int numPartitions = this.getNumPartitions((StreamConfig)streamConfig);
        HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
            assert (idealState != null);
            if (idealState.isEnabled()) {
                return this.ensureAllPartitionsConsuming(tableConfig, streamConfig, (IdealState)idealState, numPartitions);
            }
            LOGGER.info("Skipping LLC segments validation for disabled table: {}", (Object)realtimeTableName);
            return idealState;
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)1000L, (double)1.2f), (boolean)true);
    }

    @VisibleForTesting
    void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        HelixHelper.updateIdealState((HelixManager)this._helixManager, (String)realtimeTableName, idealState -> {
            assert (idealState != null);
            if (this.isExceededMaxSegmentCompletionTime(realtimeTableName, committingSegmentName, this.getCurrentTimeMs())) {
                LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment: {}", (Object)committingSegmentName);
                throw new HelixHelper.PermanentUpdaterException("Exceeded max segment completion time for segment " + committingSegmentName);
            }
            this.updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap);
            return idealState;
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)10, (long)1000L, (double)1.2f));
    }

    @VisibleForTesting
    void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, @Nullable String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
        if (committingSegmentName != null) {
            Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
            instanceStatesMap.put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, "ONLINE"));
            LOGGER.info("Updating segment: {} to ONLINE state", (Object)committingSegmentName);
        }
        List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
        instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, "CONSUMING"));
        LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", (Object)newSegmentName, instancesAssigned);
    }

    @VisibleForTesting
    boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) {
        Stat stat = new Stat();
        this.getSegmentZKMetadata(realtimeTableName, segmentName, stat);
        if (currentTimeMs > stat.getMtime() + 300000L) {
            LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", new Object[]{segmentName, 300000L, stat.getMtime(), currentTimeMs});
            return true;
        }
        return false;
    }

    private boolean isAllInstancesInState(Map<String, String> instanceStateMap, String state) {
        for (String value : instanceStateMap.values()) {
            if (value.equals(state)) continue;
            return false;
        }
        return true;
    }

    @VisibleForTesting
    IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, IdealState idealState, int numPartitions) {
        String realtimeTableName = tableConfig.getTableName();
        InstancePartitions instancePartitions = this.getConsumingInstancePartitions(tableConfig);
        int numReplicas = this.getNumReplicas(tableConfig, instancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
        Map instanceStatesMap = idealState.getRecord().getMapFields();
        long currentTimeMs = this.getCurrentTimeMs();
        Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = this.getLatestSegmentZKMetadataMap(realtimeTableName);
        for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
            int partitionId = entry.getKey();
            LLCRealtimeSegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
            String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
            LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);
            Map instanceStateMap = (Map)instanceStatesMap.get(latestSegmentName);
            if (instanceStateMap != null) {
                LLCSegmentName newLLCSegmentName;
                if (instanceStateMap.values().contains("CONSUMING")) {
                    if (latestSegmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE || !this.isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) continue;
                    LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", (Object)latestSegmentName);
                    newLLCSegmentName = this.getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
                    String newSegmentName = newLLCSegmentName.getSegmentName();
                    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, latestSegmentZKMetadata.getEndOffset(), 0L);
                    this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
                    this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap);
                    continue;
                }
                if (this.isAllInstancesInState(instanceStateMap, "OFFLINE")) {
                    LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", (Object)latestSegmentName);
                    newLLCSegmentName = this.getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
                    long startOffset = latestSegmentZKMetadata.getStartOffset();
                    long partitionStartOffset = this.getPartitionOffset((StreamConfig)streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionId);
                    if (partitionStartOffset > startOffset) {
                        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", new Object[]{startOffset, partitionStartOffset, partitionId, realtimeTableName});
                        this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                        startOffset = partitionStartOffset;
                    }
                    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, startOffset, 0L);
                    this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
                    String newSegmentName = newLLCSegmentName.getSegmentName();
                    this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap);
                    continue;
                }
                LOGGER.error("Got unexpected instance state map: {} for segment: {}", (Object)instanceStateMap, (Object)latestSegmentName);
                continue;
            }
            if (!this.isExceededMaxSegmentCompletionTime(realtimeTableName, latestSegmentName, currentTimeMs)) continue;
            LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState", (Object)latestSegmentName);
            if (latestSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                String previousConsumingSegment = null;
                for (Map.Entry segmentEntry : instanceStatesMap.entrySet()) {
                    LLCSegmentName llcSegmentName = new LLCSegmentName((String)segmentEntry.getKey());
                    if (llcSegmentName.getPartitionId() != partitionId || !((Map)segmentEntry.getValue()).containsValue("CONSUMING")) continue;
                    previousConsumingSegment = llcSegmentName.getSegmentName();
                    break;
                }
                if (previousConsumingSegment == null) {
                    LOGGER.error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", (Object)partitionId, (Object)realtimeTableName);
                    this._controllerMetrics.addMeteredTableValue(realtimeTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                }
                this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName, segmentAssignment, instancePartitionsMap);
                continue;
            }
            LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", (Object)latestSegmentZKMetadata.getStatus(), (Object)latestSegmentName);
        }
        for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
            if (latestSegmentZKMetadataMap.containsKey(partitionId)) continue;
            String newSegmentName = this.setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, numReplicas);
            this.updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap);
        }
        return idealState;
    }

    private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
        return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionId(), lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
    }

    private String setupNewPartition(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionId, long creationTimeMs, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        String realtimeTableName = tableConfig.getTableName();
        LOGGER.info("Setting up new partition: {} for table: {}", (Object)partitionId, (Object)realtimeTableName);
        String rawTableName = TableNameBuilder.extractRawTableName((String)realtimeTableName);
        LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, partitionId, 0, creationTimeMs);
        String newSegmentName = newLLCSegmentName.getSegmentName();
        long startOffset = this.getPartitionOffset((StreamConfig)streamConfig, streamConfig.getOffsetCriteria(), partitionId);
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0L);
        this.createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
        return newSegmentName;
    }

    @VisibleForTesting
    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private int getNumPartitionsFromIdealState(IdealState idealState) {
        int numPartitions = 0;
        for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
            if (!LLCSegmentName.isLowLevelConsumerSegmentName((String)segmentName)) continue;
            numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1);
        }
        return numPartitions;
    }

    private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            return tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
        }
        return instancePartitions.getNumReplicaGroups();
    }

    private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            int numInstances = instancePartitions.getInstances(0, 0).size();
            return (numPartitions * numReplicas + numInstances - 1) / numInstances;
        }
        int numInstancesPerReplicaGroup = instancePartitions.getInstances(0, 0).size();
        return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
    }
}

