package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.class */
public class PinotLLCRealtimeSegmentManager {
    private static final Logger LOGGER;
    private static final int STARTING_SEQUENCE_NUMBER = 0;
    private static final long END_OFFSET_FOR_CONSUMING_SEGMENTS = Long.MAX_VALUE;
    private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
    private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30000;
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300000;
    private final HelixAdmin _helixAdmin;
    private final HelixManager _helixManager;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final PinotHelixResourceManager _helixResourceManager;
    private final String _clusterName;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;
    private final MetadataEventNotifierFactory _metadataEventNotifierFactory;
    private final int _numIdealStateUpdateLocks;
    private final Lock[] _idealStateUpdateLocks;
    private final TableConfigCache _tableConfigCache;
    private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
    private volatile boolean _isStopping = false;
    private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
    static final /* synthetic */ boolean $assertionsDisabled;

    public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._helixAdmin = pinotHelixResourceManager.getHelixAdmin();
        this._helixManager = pinotHelixResourceManager.getHelixZkManager();
        this._propertyStore = pinotHelixResourceManager.getPropertyStore();
        this._helixResourceManager = pinotHelixResourceManager;
        this._clusterName = pinotHelixResourceManager.getHelixClusterName();
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
        this._metadataEventNotifierFactory = MetadataEventNotifierFactory.loadFactory(controllerConf.subset(METADATA_EVENT_NOTIFIER_PREFIX));
        this._numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
        this._idealStateUpdateLocks = new Lock[this._numIdealStateUpdateLocks];
        for (int i = 0; i < this._numIdealStateUpdateLocks; i++) {
            this._idealStateUpdateLocks[i] = new ReentrantLock();
        }
        this._tableConfigCache = new TableConfigCache(this._propertyStore);
        this._flushThresholdUpdateManager = new FlushThresholdUpdateManager();
    }

    public boolean getIsSplitCommitEnabled() {
        return this._controllerConf.getAcceptSplitCommit();
    }

    public String getControllerVipUrl() {
        return this._controllerConf.generateVipUrl();
    }

    public void stop() {
        this._isStopping = true;
        LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
        long j = 30000;
        while (this._numCompletingSegments.get() > 0 && j > 0) {
            long j2 = 1000;
            if (j < 1000) {
                j2 = j;
            }
            try {
                Thread.sleep(j2);
                j -= j2;
            } catch (InterruptedException e) {
                LOGGER.info("Interrupted: Remaining wait time {} (out of {})", Long.valueOf(j), Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
                return;
            }
        }
        LOGGER.info("Wait completed: Number of completing segments = {}", Integer.valueOf(this._numCompletingSegments.get()));
    }

    public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableName = tableConfig.getTableName();
        LOGGER.info("Setting up new LLC table: {}", tableName);
        for (String str : getAllSegments(tableName)) {
            Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(str), "Cannot set up new LLC table: %s with existing non-HLC segment: %s", tableName, str);
        }
        this._flushThresholdUpdateManager.clearFlushThresholdUpdater(tableName);
        PartitionLevelStreamConfig partitionLevelStreamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs());
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        int numPartitions = getNumPartitions(partitionLevelStreamConfig);
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        long currentTimeMs = getCurrentTimeMs();
        Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
        for (int i = 0; i < numPartitions; i++) {
            updateInstanceStatesForNewConsumingSegment(mapFields, null, setupNewPartition(tableConfig, partitionLevelStreamConfig, i, currentTimeMs, consumingInstancePartitions, numPartitions, numReplicas), segmentAssignment, singletonMap);
        }
        setIdealState(tableName, idealState);
    }

    public void removeLLCSegments(IdealState idealState) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String resourceName = idealState.getResourceName();
        LOGGER.info("Removing LLC segments for table: {}", resourceName);
        ArrayList arrayList = new ArrayList();
        for (String str : idealState.getRecord().getMapFields().keySet()) {
            if (SegmentName.isLowLevelConsumerSegmentName(str)) {
                arrayList.add(str);
            }
        }
        this._helixResourceManager.deleteSegments(resourceName, arrayList);
    }

    @VisibleForTesting
    TableConfig getTableConfig(String str) {
        try {
            return this._tableConfigCache.getTableConfig(str);
        } catch (ExecutionException e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw new IllegalStateException("Caught exception while loading table config from property store to cache for table: " + str, e);
        }
    }

    @VisibleForTesting
    InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
        try {
            return InstancePartitionsUtils.fetchOrComputeInstancePartitions(this._helixManager, tableConfig, InstancePartitionsType.CONSUMING);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getAllSegments(String str) {
        try {
            return ZKMetadataProvider.getSegments(this._propertyStore, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getLLCSegments(String str) {
        try {
            return ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    private LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String str, String str2) {
        return getSegmentZKMetadata(str, str2, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String str, String str2, @Nullable Stat stat) {
        try {
            ZNRecord zNRecord = (ZNRecord) this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(str, str2), stat, AccessOption.PERSISTENT);
            Preconditions.checkState(zNRecord != null, "Failed to find segment ZK metadata for segment: %s of table: %s", str2, str);
            return new LLCRealtimeSegmentZKMetadata(zNRecord);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void persistSegmentZKMetadata(String str, LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata, int i) {
        String segmentName = lLCRealtimeSegmentZKMetadata.getSegmentName();
        LOGGER.info("Persisting segment ZK metadata for segment: {}", segmentName);
        try {
            Preconditions.checkState(this._propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment(str, segmentName), lLCRealtimeSegmentZKMetadata.toZNRecord(), i, AccessOption.PERSISTENT), "Failed to persist segment ZK metadata for segment: %s of table: %s", segmentName, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    IdealState getIdealState(String str) {
        try {
            IdealState tableIdealState = HelixHelper.getTableIdealState(this._helixManager, str);
            Preconditions.checkState(tableIdealState != null, "Failed to find IdealState for table: " + str);
            return tableIdealState;
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void setIdealState(String str, IdealState idealState) {
        try {
            this._helixAdmin.setResourceIdealState(this._clusterName, str, idealState);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    public void commitSegmentFile(String str, CommittingSegmentDescriptor committingSegmentDescriptor) throws Exception {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment file for segment: {}", segmentName);
        String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
        URI uri = URIUtils.getUri(segmentLocation, new String[0]);
        URI uri2 = URIUtils.getUri(this._controllerConf.getDataDir(), new String[]{extractRawTableName});
        URI uri3 = URIUtils.getUri(this._controllerConf.getDataDir(), new String[]{extractRawTableName, URIUtils.encode(segmentName)});
        PinotFS create = PinotFSFactory.create(uri2.getScheme());
        Preconditions.checkState(create.move(uri, uri3, true), "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uri3);
        try {
            for (String str2 : create.listFiles(uri2, false)) {
                if (str2.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
                    LOGGER.warn("Deleting temporary segment file: {}", str2);
                    Preconditions.checkState(create.delete(new URI(str2), true), "Failed to delete file: %s", str2);
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e);
        }
    }

    public void commitSegmentMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        try {
            this._numCompletingSegments.addAndGet(1);
            commitSegmentMetadataInternal(str, committingSegmentDescriptor);
        } finally {
            this._numCompletingSegments.addAndGet(-1);
        }
    }

    private void commitSegmentMetadataInternal(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment metadata for segment: {}", segmentName);
        TableConfig tableConfig = getTableConfig(str);
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        IdealState idealState = getIdealState(str);
        Preconditions.checkState(idealState.getInstanceStateMap(segmentName).containsValue("CONSUMING"), "Failed to find instance in CONSUMING state in IdealState for segment: %s", segmentName);
        int numPartitionsFromIdealState = getNumPartitionsFromIdealState(idealState);
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        LLCRealtimeSegmentZKMetadata updateCommittingSegmentZKMetadata = updateCommittingSegmentZKMetadata(str, committingSegmentDescriptor);
        long currentTimeMs = getCurrentTimeMs();
        LLCSegmentName nextLLCSegmentName = getNextLLCSegmentName(new LLCSegmentName(segmentName), currentTimeMs);
        createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs()), nextLLCSegmentName, currentTimeMs, committingSegmentDescriptor, updateCommittingSegmentZKMetadata, consumingInstancePartitions, numPartitionsFromIdealState, numReplicas);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        Lock lock = this._idealStateUpdateLocks[(str.hashCode() & Integer.MAX_VALUE) % this._numIdealStateUpdateLocks];
        try {
            lock.lock();
            updateIdealStateOnSegmentCompletion(str, segmentName, nextLLCSegmentName.getSegmentName(), segmentAssignment, singletonMap);
            lock.unlock();
            this._metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private LLCRealtimeSegmentZKMetadata updateCommittingSegmentZKMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Updating segment ZK metadata for committing segment: {}", segmentName);
        Stat stat = new Stat();
        LLCRealtimeSegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(str, segmentName, stat);
        Preconditions.checkState(segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS, "Segment status for segment: %s should be IN_PROGRESS, found: %s", segmentName, segmentZKMetadata.getStatus());
        SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
        Preconditions.checkState(segmentMetadata != null, "Failed to find segment metadata from descriptor for segment: %s", segmentName);
        segmentZKMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset().getOffset());
        segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
        segmentZKMetadata.setDownloadUrl(URIUtils.constructDownloadUrl(this._controllerConf.generateVipUrl(), TableNameBuilder.extractRawTableName(str), segmentName));
        segmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()).longValue());
        segmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
        segmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
        segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
        segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
        segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
        persistSegmentZKMetadata(str, segmentZKMetadata, stat.getVersion());
        return segmentZKMetadata;
    }

    private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelStreamConfig partitionLevelStreamConfig, LLCSegmentName lLCSegmentName, long j, CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata, InstancePartitions instancePartitions, int i, int i2) {
        String tableName = tableConfig.getTableName();
        String segmentName = lLCSegmentName.getSegmentName();
        long offset = committingSegmentDescriptor.getNextOffset().getOffset();
        LOGGER.info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", new Object[]{segmentName, Long.valueOf(offset), Long.valueOf(j)});
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata2 = new LLCRealtimeSegmentZKMetadata();
        lLCRealtimeSegmentZKMetadata2.setTableName(tableName);
        lLCRealtimeSegmentZKMetadata2.setSegmentName(segmentName);
        lLCRealtimeSegmentZKMetadata2.setCreationTime(j);
        lLCRealtimeSegmentZKMetadata2.setStartOffset(offset);
        lLCRealtimeSegmentZKMetadata2.setEndOffset(END_OFFSET_FOR_CONSUMING_SEGMENTS);
        lLCRealtimeSegmentZKMetadata2.setNumReplicas(i2);
        lLCRealtimeSegmentZKMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentPartitionMetadata partitionMetadataFromTableConfig = getPartitionMetadataFromTableConfig(tableConfig, lLCSegmentName.getPartitionId());
        if (partitionMetadataFromTableConfig != null) {
            lLCRealtimeSegmentZKMetadata2.setPartitionMetadata(partitionMetadataFromTableConfig);
        }
        this._flushThresholdUpdateManager.getFlushThresholdUpdater(partitionLevelStreamConfig).updateFlushThreshold(partitionLevelStreamConfig, lLCRealtimeSegmentZKMetadata2, committingSegmentDescriptor, lLCRealtimeSegmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, i, i2));
        persistSegmentZKMetadata(tableName, lLCRealtimeSegmentZKMetadata2, -1);
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int i) {
        SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
        if (segmentPartitionConfig == null) {
            return null;
        }
        TreeMap treeMap = new TreeMap();
        for (Map.Entry entry : segmentPartitionConfig.getColumnPartitionMap().entrySet()) {
            String str = (String) entry.getKey();
            ColumnPartitionConfig columnPartitionConfig = (ColumnPartitionConfig) entry.getValue();
            treeMap.put(str, new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), columnPartitionConfig.getNumPartitions(), Collections.singleton(Integer.valueOf(i))));
        }
        return new SegmentPartitionMetadata(treeMap);
    }

    public long getCommitTimeoutMS(String str) {
        long maxSegmentCommitTimeMs = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
        if (this._propertyStore == null) {
            return maxSegmentCommitTimeMs;
        }
        Map streamConfigs = getTableConfig(str).getIndexingConfig().getStreamConfigs();
        if (streamConfigs == null || !streamConfigs.containsKey("realtime.segment.commit.timeoutSeconds")) {
            return maxSegmentCommitTimeMs;
        }
        String str2 = (String) streamConfigs.get("realtime.segment.commit.timeoutSeconds");
        try {
            return TimeUnit.MILLISECONDS.convert(Integer.parseInt(str2), TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.warn("Failed to parse flush size of {}", str2, e);
            return maxSegmentCommitTimeMs;
        }
    }

    @VisibleForTesting
    int getNumPartitions(StreamConfig streamConfig) {
        return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
    }

    @VisibleForTesting
    long getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int i) {
        PartitionOffsetFetcher partitionOffsetFetcher = new PartitionOffsetFetcher(offsetCriteria, i, streamConfig);
        try {
            RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
            return partitionOffsetFetcher.getOffset();
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s", streamConfig.getTopicName(), Integer.valueOf(i), offsetCriteria));
        }
    }

    public void segmentStoppedConsuming(LLCSegmentName lLCSegmentName, String str) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName());
        String segmentName = lLCSegmentName.getSegmentName();
        LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, str);
        try {
            HelixHelper.updateIdealState(this._helixManager, tableNameWithType, idealState -> {
                if (!$assertionsDisabled && idealState == null) {
                    throw new AssertionError();
                }
                Map instanceStateMap = idealState.getInstanceStateMap(segmentName);
                String str2 = (String) instanceStateMap.get(str);
                if ("CONSUMING".equals(str2)) {
                    instanceStateMap.put(str, "OFFLINE");
                } else {
                    LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", new Object[]{segmentName, str2, str});
                }
                return idealState;
            }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2000000476837158d), true);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    private Map<Integer, LLCRealtimeSegmentZKMetadata> getLatestSegmentZKMetadataMap(String str) {
        List<String> lLCSegments = getLLCSegments(str);
        HashMap hashMap = new HashMap();
        Iterator<String> it = lLCSegments.iterator();
        while (it.hasNext()) {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(it.next());
            hashMap.compute(Integer.valueOf(lLCSegmentName.getPartitionId()), (num, lLCSegmentName2) -> {
                if (lLCSegmentName2 != null && lLCSegmentName.getSequenceNumber() <= lLCSegmentName2.getSequenceNumber()) {
                    return lLCSegmentName2;
                }
                return lLCSegmentName;
            });
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            hashMap2.put(entry.getKey(), getSegmentZKMetadata(str, ((LLCSegmentName) entry.getValue()).getSegmentName()));
        }
        return hashMap2;
    }

    public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig partitionLevelStreamConfig) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableName = tableConfig.getTableName();
        int numPartitions = getNumPartitions(partitionLevelStreamConfig);
        HelixHelper.updateIdealState(this._helixManager, tableName, idealState -> {
            if (!$assertionsDisabled && idealState == null) {
                throw new AssertionError();
            }
            if (idealState.isEnabled()) {
                return ensureAllPartitionsConsuming(tableConfig, partitionLevelStreamConfig, idealState, numPartitions);
            }
            LOGGER.info("Skipping LLC segments validation for disabled table: {}", tableName);
            return idealState;
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d), true);
    }

    @VisibleForTesting
    void updateIdealStateOnSegmentCompletion(String str, String str2, String str3, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map) {
        HelixHelper.updateIdealState(this._helixManager, str, idealState -> {
            if (!$assertionsDisabled && idealState == null) {
                throw new AssertionError();
            }
            if (isExceededMaxSegmentCompletionTime(str, str2, getCurrentTimeMs())) {
                LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment: {}", str2);
                throw new HelixHelper.PermanentUpdaterException("Exceeded max segment completion time for segment " + str2);
            }
            updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), str2, str3, segmentAssignment, map);
            return idealState;
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d));
    }

    @VisibleForTesting
    void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> map, @Nullable String str, String str2, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map2) {
        if (str != null) {
            map.put(str, SegmentAssignmentUtils.getInstanceStateMap(map.get(str).keySet(), "ONLINE"));
            LOGGER.info("Updating segment: {} to ONLINE state", str);
        }
        List<String> assignSegment = segmentAssignment.assignSegment(str2, map, map2);
        map.put(str2, SegmentAssignmentUtils.getInstanceStateMap(assignSegment, "CONSUMING"));
        LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", str2, assignSegment);
    }

    @VisibleForTesting
    boolean isExceededMaxSegmentCompletionTime(String str, String str2, long j) {
        Stat stat = new Stat();
        getSegmentZKMetadata(str, str2, stat);
        if (j <= stat.getMtime() + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
            return false;
        }
        LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", new Object[]{str2, Long.valueOf(MAX_SEGMENT_COMPLETION_TIME_MILLIS), Long.valueOf(stat.getMtime()), Long.valueOf(j)});
        return true;
    }

    private boolean isAllInstancesInState(Map<String, String> map, String str) {
        Iterator<String> it = map.values().iterator();
        while (it.hasNext()) {
            if (!it.next().equals(str)) {
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig partitionLevelStreamConfig, IdealState idealState, int i) {
        String tableName = tableConfig.getTableName();
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
        long currentTimeMs = getCurrentTimeMs();
        Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(tableName);
        for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
            int intValue = entry.getKey().intValue();
            LLCRealtimeSegmentZKMetadata value = entry.getValue();
            String segmentName = value.getSegmentName();
            LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
            Map<String, String> map = mapFields.get(segmentName);
            if (map != null) {
                if (map.values().contains("CONSUMING")) {
                    if (value.getStatus() == CommonConstants.Segment.Realtime.Status.DONE && isExceededMaxSegmentCompletionTime(tableName, segmentName, currentTimeMs)) {
                        LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", segmentName);
                        LLCSegmentName nextLLCSegmentName = getNextLLCSegmentName(lLCSegmentName, currentTimeMs);
                        String segmentName2 = nextLLCSegmentName.getSegmentName();
                        createNewSegmentZKMetadata(tableConfig, partitionLevelStreamConfig, nextLLCSegmentName, currentTimeMs, new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(value.getEndOffset()), 0L), value, consumingInstancePartitions, i, numReplicas);
                        updateInstanceStatesForNewConsumingSegment(mapFields, segmentName, segmentName2, segmentAssignment, singletonMap);
                    }
                } else if (isAllInstancesInState(map, "OFFLINE")) {
                    LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", segmentName);
                    LLCSegmentName nextLLCSegmentName2 = getNextLLCSegmentName(lLCSegmentName, currentTimeMs);
                    long startOffset = value.getStartOffset();
                    long partitionOffset = getPartitionOffset(partitionLevelStreamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, intValue);
                    if (partitionOffset > startOffset) {
                        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", new Object[]{Long.valueOf(startOffset), Long.valueOf(partitionOffset), Integer.valueOf(intValue), tableName});
                        this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                        startOffset = partitionOffset;
                    }
                    createNewSegmentZKMetadata(tableConfig, partitionLevelStreamConfig, nextLLCSegmentName2, currentTimeMs, new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(startOffset), 0L), value, consumingInstancePartitions, i, numReplicas);
                    updateInstanceStatesForNewConsumingSegment(mapFields, null, nextLLCSegmentName2.getSegmentName(), segmentAssignment, singletonMap);
                } else {
                    LOGGER.error("Got unexpected instance state map: {} for segment: {}", map, segmentName);
                }
            } else if (isExceededMaxSegmentCompletionTime(tableName, segmentName, currentTimeMs)) {
                LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState", segmentName);
                if (value.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                    String str = null;
                    Iterator<Map.Entry<String, Map<String, String>>> it = mapFields.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry<String, Map<String, String>> next = it.next();
                        LLCSegmentName lLCSegmentName2 = new LLCSegmentName(next.getKey());
                        if (lLCSegmentName2.getPartitionId() == intValue && next.getValue().containsValue("CONSUMING")) {
                            str = lLCSegmentName2.getSegmentName();
                            break;
                        }
                    }
                    if (str == null) {
                        LOGGER.error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", Integer.valueOf(intValue), tableName);
                        this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                    }
                    updateInstanceStatesForNewConsumingSegment(mapFields, str, segmentName, segmentAssignment, singletonMap);
                } else {
                    LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", value.getStatus(), segmentName);
                }
            }
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (!latestSegmentZKMetadataMap.containsKey(Integer.valueOf(i2))) {
                updateInstanceStatesForNewConsumingSegment(mapFields, null, setupNewPartition(tableConfig, partitionLevelStreamConfig, i2, currentTimeMs, consumingInstancePartitions, i, numReplicas), segmentAssignment, singletonMap);
            }
        }
        return idealState;
    }

    private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lLCSegmentName, long j) {
        return new LLCSegmentName(lLCSegmentName.getTableName(), lLCSegmentName.getPartitionId(), lLCSegmentName.getSequenceNumber() + 1, j);
    }

    private String setupNewPartition(TableConfig tableConfig, PartitionLevelStreamConfig partitionLevelStreamConfig, int i, long j, InstancePartitions instancePartitions, int i2, int i3) {
        String tableName = tableConfig.getTableName();
        LOGGER.info("Setting up new partition: {} for table: {}", Integer.valueOf(i), tableName);
        LLCSegmentName lLCSegmentName = new LLCSegmentName(TableNameBuilder.extractRawTableName(tableName), i, 0, j);
        String segmentName = lLCSegmentName.getSegmentName();
        createNewSegmentZKMetadata(tableConfig, partitionLevelStreamConfig, lLCSegmentName, j, new CommittingSegmentDescriptor(null, new StreamPartitionMsgOffset(getPartitionOffset(partitionLevelStreamConfig, partitionLevelStreamConfig.getOffsetCriteria(), i)), 0L), null, instancePartitions, i2, i3);
        return segmentName;
    }

    @VisibleForTesting
    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private int getNumPartitionsFromIdealState(IdealState idealState) {
        int i = 0;
        for (String str : idealState.getRecord().getMapFields().keySet()) {
            if (LLCSegmentName.isLowLevelConsumerSegmentName(str)) {
                i = Math.max(i, new LLCSegmentName(str).getPartitionId() + 1);
            }
        }
        return i;
    }

    private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
        return instancePartitions.getNumReplicaGroups() == 1 ? tableConfig.getValidationConfig().getReplicasPerPartitionNumber() : instancePartitions.getNumReplicaGroups();
    }

    private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions, int i, int i2) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            int size = instancePartitions.getInstances(0, 0).size();
            return (((i * i2) + size) - 1) / size;
        }
        int size2 = instancePartitions.getInstances(0, 0).size();
        return ((i + size2) - 1) / size2;
    }

    static {
        $assertionsDisabled = !PinotLLCRealtimeSegmentManager.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
    }
}
