package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.CopyOption;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.partition.PartitionAssignment;
import org.apache.pinot.common.partition.StreamPartitionAssignmentGenerator;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.segment.ConsumingSegmentAssignmentStrategy;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.PartitionOffsetFetcher;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFS;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.class */
public class PinotLLCRealtimeSegmentManager {
    protected static final int STARTING_SEQUENCE_NUMBER = 0;
    protected static final long END_OFFSET_FOR_CONSUMING_SEGMENTS = Long.MAX_VALUE;
    private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
    private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
    private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30000;
    private final HelixAdmin _helixAdmin;
    private final HelixManager _helixManager;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final PinotHelixResourceManager _helixResourceManager;
    private final String _clusterName;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;
    private final int _numIdealStateUpdateLocks;
    private final Lock[] _idealstateUpdateLocks;
    private final TableConfigCache _tableConfigCache;
    private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
    private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
    private volatile boolean _isStopping = false;
    private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
    public static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
    private static int MAX_SEGMENT_COMPLETION_TIME_MINS = 10;
    private static PinotLLCRealtimeSegmentManager INSTANCE = null;

    public boolean getIsSplitCommitEnabled() {
        return this._controllerConf.getAcceptSplitCommit();
    }

    public String getControllerVipUrl() {
        return this._controllerConf.generateVipUrl();
    }

    public static synchronized void create(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        create(pinotHelixResourceManager.getHelixAdmin(), pinotHelixResourceManager.getHelixClusterName(), pinotHelixResourceManager.getHelixZkManager(), pinotHelixResourceManager.getPropertyStore(), pinotHelixResourceManager, controllerConf, controllerMetrics);
    }

    private static synchronized void create(HelixAdmin helixAdmin, String str, HelixManager helixManager, ZkHelixPropertyStore zkHelixPropertyStore, PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        if (INSTANCE != null) {
            throw new RuntimeException("Instance already created");
        }
        INSTANCE = new PinotLLCRealtimeSegmentManager(helixAdmin, str, helixManager, zkHelixPropertyStore, pinotHelixResourceManager, controllerConf, controllerMetrics);
        SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics);
    }

    public void stop() {
        this._isStopping = true;
        LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
        long j = 30000;
        while (this._numCompletingSegments.get() > 0 && j > 0) {
            long j2 = 1000;
            if (j < 1000) {
                j2 = j;
            }
            try {
                Thread.sleep(j2);
                j -= j2;
            } catch (InterruptedException e) {
                LOGGER.info("Interrupted: Remaining wait time {} (out of {})", Long.valueOf(j), Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
                return;
            }
        }
        LOGGER.info("Wait completed: Number of completing segments = {}", Integer.valueOf(this._numCompletingSegments.get()));
    }

    protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String str, HelixManager helixManager, ZkHelixPropertyStore zkHelixPropertyStore, PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._helixAdmin = helixAdmin;
        this._helixManager = helixManager;
        this._propertyStore = zkHelixPropertyStore;
        this._helixResourceManager = pinotHelixResourceManager;
        this._clusterName = str;
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
        this._numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
        this._idealstateUpdateLocks = new Lock[this._numIdealStateUpdateLocks];
        for (int i = 0; i < this._numIdealStateUpdateLocks; i++) {
            this._idealstateUpdateLocks[i] = new ReentrantLock();
        }
        this._tableConfigCache = new TableConfigCache(this._propertyStore);
        this._streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(this._helixManager);
        this._flushThresholdUpdateManager = new FlushThresholdUpdateManager();
    }

    public static PinotLLCRealtimeSegmentManager getInstance() {
        if (INSTANCE == null) {
            throw new RuntimeException("Not yet created");
        }
        return INSTANCE;
    }

    protected boolean isLeader() {
        return ControllerLeadershipManager.getInstance().isLeader();
    }

    protected boolean isConnected() {
        return this._helixManager.isConnected();
    }

    public void setupNewTable(TableConfig tableConfig, IdealState idealState) throws InvalidConfigException {
        int partitionCount = getPartitionCount(new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()));
        List<String> existingSegments = getExistingSegments(tableConfig.getTableName());
        if (existingSegments != null) {
            Iterator<String> it = existingSegments.iterator();
            while (it.hasNext()) {
                if (!SegmentName.isHighLevelConsumerSegmentName(it.next())) {
                    throw new RuntimeException("Low-level segments already exist for table " + tableConfig.getTableType());
                }
            }
        }
        this._flushThresholdUpdateManager.clearFlushThresholdUpdater(tableConfig);
        if (!isConnected()) {
            throw new RuntimeException("Lost zk connection while setting up new table " + tableConfig.getTableName() + " isConnected=" + isConnected());
        }
        setTableIdealState(tableConfig.getTableName(), setupTable(tableConfig, idealState, partitionCount));
    }

    public void cleanupLLC(String str) {
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        IdealState tableIdealState = HelixHelper.getTableIdealState(this._helixManager, str);
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (String str2 : tableIdealState.getPartitionSet()) {
            if (SegmentName.isLowLevelConsumerSegmentName(str2)) {
                arrayList.add(str2);
                i++;
            }
        }
        LOGGER.info("Attempting to remove {} LLC segments of table {}", Integer.valueOf(i), str);
        this._helixResourceManager.deleteSegments(str, arrayList);
    }

    private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(String str, int i, int i2) {
        HashMap hashMap = new HashMap();
        if (this._propertyStore == null) {
            return null;
        }
        SegmentPartitionMetadata segmentPartitionMetadata = null;
        SegmentPartitionConfig segmentPartitionConfig = getRealtimeTableConfig(str).getIndexingConfig().getSegmentPartitionConfig();
        if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap() != null && segmentPartitionConfig.getColumnPartitionMap().size() > 0) {
            for (Map.Entry entry : segmentPartitionConfig.getColumnPartitionMap().entrySet()) {
                hashMap.put((String) entry.getKey(), new ColumnPartitionMetadata(((ColumnPartitionConfig) entry.getValue()).getFunctionName(), i, Collections.singleton(Integer.valueOf(i2))));
            }
            segmentPartitionMetadata = new SegmentPartitionMetadata(hashMap);
        }
        return segmentPartitionMetadata;
    }

    private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadataImpl) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : segmentMetadataImpl.getColumnMetadataMap().entrySet()) {
            String str = (String) entry.getKey();
            ColumnMetadata columnMetadata = (ColumnMetadata) entry.getValue();
            if (columnMetadata.getPartitionFunction() != null) {
                hashMap.put(str, new ColumnPartitionMetadata(columnMetadata.getPartitionFunction().toString(), columnMetadata.getNumPartitions(), columnMetadata.getPartitions()));
            }
        }
        return new SegmentPartitionMetadata(hashMap);
    }

    protected List<String> getExistingSegments(String str) {
        return this._propertyStore.getChildNames(ZKMetadataProvider.constructPropertyStorePathForResource(str), AccessOption.PERSISTENT);
    }

    protected boolean writeSegmentToPropertyStore(String str, ZNRecord zNRecord, String str2, int i) {
        boolean z = this._propertyStore.set(str, zNRecord, i, AccessOption.PERSISTENT);
        if (z) {
            return z;
        }
        LOGGER.error("Failed to write segment to property store at {} for table {}. Expected zookeeper version number: {}", new Object[]{str, str2, Integer.valueOf(i)});
        return false;
    }

    protected boolean writeSegmentToPropertyStore(String str, ZNRecord zNRecord, String str2) {
        boolean z = this._propertyStore.set(str, zNRecord, AccessOption.PERSISTENT);
        if (!z) {
            LOGGER.error("Failed to write segment to property store at {} for table {}.", str, str2);
        }
        return z;
    }

    protected void writeSegmentsToPropertyStore(List<String> list, List<ZNRecord> list2, String str) {
        try {
            this._propertyStore.setChildren(list, list2, AccessOption.PERSISTENT);
        } catch (Exception e) {
            LOGGER.error("Failed to update idealstate for table {} for paths {}", new Object[]{str, list, e});
            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    protected IdealState getTableIdealState(String str) {
        return HelixHelper.getTableIdealState(this._helixManager, str);
    }

    protected void setTableIdealState(String str, IdealState idealState) {
        this._helixAdmin.setResourceIdealState(this._clusterName, str, idealState);
    }

    public boolean commitSegmentFile(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        if (this._isStopping) {
            LOGGER.info("Returning false since the controller is stopping");
            return false;
        }
        String segmentName = committingSegmentDescriptor.getSegmentName();
        String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
        URI uriFromPath = ControllerConf.getUriFromPath(segmentLocation);
        URI uriFromPath2 = ControllerConf.getUriFromPath(this._controllerConf.getDataDir());
        URI uriFromPath3 = ControllerConf.getUriFromPath(StringUtil.join("/", new String[]{this._controllerConf.getDataDir(), str}));
        URI uriFromPath4 = ControllerConf.getUriFromPath(StringUtil.join("/", new String[]{uriFromPath3.toString(), segmentName}));
        PinotFS create = PinotFSFactory.create(uriFromPath2.getScheme());
        if (!isConnected() || !isLeader()) {
            LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}", new Object[]{segmentName, segmentLocation, str, Boolean.valueOf(isLeader()), Boolean.valueOf(isConnected())});
            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
            return false;
        }
        try {
            create.move(uriFromPath, uriFromPath4, true);
            try {
                for (String str2 : create.listFiles(uriFromPath3, true)) {
                    if (str2.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
                        LOGGER.warn("Deleting " + str2);
                        create.delete(new URI(str2), true);
                    }
                }
                return true;
            } catch (Exception e) {
                LOGGER.warn("Could not delete tmp segment files for {}", uriFromPath3, e);
                return true;
            }
        } catch (Exception e2) {
            LOGGER.error("Could not move {} to {}", new Object[]{segmentLocation, segmentName, e2});
            return false;
        }
    }

    public boolean commitSegmentMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        if (this._isStopping) {
            LOGGER.info("Returning false since the controller is stopping");
            return false;
        }
        try {
            this._numCompletingSegments.addAndGet(1);
            boolean commitSegmentMetadataInternal = commitSegmentMetadataInternal(str, committingSegmentDescriptor);
            this._numCompletingSegments.addAndGet(-1);
            return commitSegmentMetadataInternal;
        } catch (Throwable th) {
            this._numCompletingSegments.addAndGet(-1);
            throw th;
        }
    }

    private boolean commitSegmentMetadataInternal(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        TableConfig realtimeTableConfig = getRealtimeTableConfig(tableNameWithType);
        if (realtimeTableConfig == null) {
            LOGGER.warn("Did not find table config for table {}", tableNameWithType);
            return false;
        }
        String segmentName = committingSegmentDescriptor.getSegmentName();
        long nextOffset = committingSegmentDescriptor.getNextOffset();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        LLCSegmentName lLCSegmentName2 = new LLCSegmentName(lLCSegmentName.getTableName(), lLCSegmentName.getPartitionId(), lLCSegmentName.getSequenceNumber() + 1, System.currentTimeMillis());
        String segmentName2 = lLCSegmentName2.getSegmentName();
        IdealState tableIdealState = getTableIdealState(tableNameWithType);
        Preconditions.checkState(tableIdealState.getInstanceStateMap(segmentName).containsValue("CONSUMING"));
        int numPartitionsFromIdealState = this._streamPartitionAssignmentGenerator.getNumPartitionsFromIdealState(tableIdealState);
        try {
            PartitionAssignment generateStreamPartitionAssignment = this._streamPartitionAssignmentGenerator.generateStreamPartitionAssignment(realtimeTableConfig, numPartitionsFromIdealState);
            if (generateStreamPartitionAssignment == null) {
                LOGGER.warn("Partition assignment not found for {}", tableNameWithType);
                throw new RuntimeException("Partition assignment not found. Not committing segment");
            }
            if (!updateOldSegmentMetadataZNRecord(tableNameWithType, lLCSegmentName, nextOffset) || !createNewSegmentMetadataZNRecord(realtimeTableConfig, lLCSegmentName, lLCSegmentName2, generateStreamPartitionAssignment, committingSegmentDescriptor, false)) {
                return false;
            }
            Lock lock = this._idealstateUpdateLocks[(tableNameWithType.hashCode() & Integer.MAX_VALUE) % this._numIdealStateUpdateLocks];
            try {
                lock.lock();
                updateIdealStateOnSegmentCompletion(tableNameWithType, segmentName, segmentName2, generateStreamPartitionAssignment);
                LOGGER.info("Changed {} to ONLINE and created {} in CONSUMING", segmentName, segmentName2);
                lock.unlock();
                notifyOnSegmentFlush(tableNameWithType);
                return true;
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        } catch (InvalidConfigException e) {
            LOGGER.error("Exception when generating partition assignment for table {} and numPartitions {}", new Object[]{tableNameWithType, Integer.valueOf(numPartitionsFromIdealState), e});
            return false;
        }
    }

    protected boolean updateOldSegmentMetadataZNRecord(String str, LLCSegmentName lLCSegmentName, long j) {
        String segmentName = lLCSegmentName.getSegmentName();
        Stat stat = new Stat();
        LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = getRealtimeSegmentZKMetadata(str, segmentName, stat);
        if (realtimeSegmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
            LOGGER.warn("Status of segment metadata {} has already been changed by other controller for table {}: Status={}", new Object[]{segmentName, str, realtimeSegmentZKMetadata.getStatus()});
            return false;
        }
        realtimeSegmentZKMetadata.setEndOffset(j);
        realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        realtimeSegmentZKMetadata.setDownloadUrl(ControllerConf.constructDownloadUrl(extractRawTableName, segmentName, this._controllerConf.generateVipUrl()));
        SegmentMetadataImpl extractSegmentMetadata = extractSegmentMetadata(extractRawTableName, segmentName);
        realtimeSegmentZKMetadata.setCrc(Long.valueOf(extractSegmentMetadata.getCrc()).longValue());
        realtimeSegmentZKMetadata.setStartTime(extractSegmentMetadata.getTimeInterval().getStartMillis());
        realtimeSegmentZKMetadata.setEndTime(extractSegmentMetadata.getTimeInterval().getEndMillis());
        realtimeSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
        realtimeSegmentZKMetadata.setIndexVersion(extractSegmentMetadata.getVersion());
        realtimeSegmentZKMetadata.setTotalRawDocs(extractSegmentMetadata.getTotalRawDocs());
        realtimeSegmentZKMetadata.setPartitionMetadata(getPartitionMetadataFromSegmentMetadata(extractSegmentMetadata));
        ZNRecord zNRecord = realtimeSegmentZKMetadata.toZNRecord();
        String constructPropertyStorePathForSegment = ZKMetadataProvider.constructPropertyStorePathForSegment(str, segmentName);
        if (!isConnected() || !isLeader()) {
            LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}", new Object[]{segmentName, str, Boolean.valueOf(isLeader()), Boolean.valueOf(isConnected())});
            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
            return false;
        }
        boolean writeSegmentToPropertyStore = writeSegmentToPropertyStore(constructPropertyStorePathForSegment, zNRecord, str, stat.getVersion());
        if (!writeSegmentToPropertyStore) {
            LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}", new Object[]{segmentName, str, Boolean.valueOf(isLeader()), Boolean.valueOf(isConnected())});
        }
        return writeSegmentToPropertyStore;
    }

    protected boolean createNewSegmentMetadataZNRecord(TableConfig tableConfig, LLCSegmentName lLCSegmentName, LLCSegmentName lLCSegmentName2, PartitionAssignment partitionAssignment, CommittingSegmentDescriptor committingSegmentDescriptor, boolean z) {
        String tableName = tableConfig.getTableName();
        String extractRawTableName = TableNameBuilder.extractRawTableName(tableName);
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = null;
        if (lLCSegmentName != null) {
            lLCRealtimeSegmentZKMetadata = getRealtimeSegmentZKMetadata(tableName, lLCSegmentName.getSegmentName(), null);
        }
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata2 = new LLCRealtimeSegmentZKMetadata(makeZnRecordForNewSegment(extractRawTableName, partitionAssignment.getInstancesListForPartition(String.valueOf(lLCSegmentName2.getPartitionId())).size(), committingSegmentDescriptor.getNextOffset(), lLCSegmentName2, partitionAssignment.getNumPartitions()));
        this._flushThresholdUpdateManager.getFlushThresholdUpdater(tableConfig).updateFlushThreshold(lLCRealtimeSegmentZKMetadata2, lLCRealtimeSegmentZKMetadata, committingSegmentDescriptor, partitionAssignment);
        ZNRecord zNRecord = lLCRealtimeSegmentZKMetadata2.toZNRecord();
        String segmentName = lLCSegmentName2.getSegmentName();
        String constructPropertyStorePathForSegment = ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentName);
        if (!z && (!isLeader() || !isConnected())) {
            LOGGER.warn("Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}", new Object[]{segmentName, extractRawTableName, Boolean.valueOf(isLeader()), Boolean.valueOf(isConnected())});
            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
            return false;
        }
        boolean writeSegmentToPropertyStore = writeSegmentToPropertyStore(constructPropertyStorePathForSegment, zNRecord, tableName);
        if (!writeSegmentToPropertyStore) {
            LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}", new Object[]{segmentName, extractRawTableName, Boolean.valueOf(isLeader()), Boolean.valueOf(isConnected())});
        }
        return writeSegmentToPropertyStore;
    }

    protected TableConfig getRealtimeTableConfig(String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        try {
            return this._tableConfigCache.getTableConfig(tableNameWithType);
        } catch (ExecutionException e) {
            LOGGER.warn("Exception happened while loading the table config ({}) from the property store to the cache.", tableNameWithType, e);
            return null;
        }
    }

    public long getCommitTimeoutMS(String str) {
        long maxSegmentCommitTimeMs = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
        if (this._propertyStore == null) {
            return maxSegmentCommitTimeMs;
        }
        Map streamConfigs = getRealtimeTableConfig(str).getIndexingConfig().getStreamConfigs();
        if (streamConfigs == null || !streamConfigs.containsKey("realtime.segment.commit.timeoutSeconds")) {
            return maxSegmentCommitTimeMs;
        }
        String str2 = (String) streamConfigs.get("realtime.segment.commit.timeoutSeconds");
        try {
            return TimeUnit.MILLISECONDS.convert(Integer.parseInt(str2), TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.warn("Failed to parse flush size of {}", str2, e);
            return maxSegmentCommitTimeMs;
        }
    }

    protected SegmentMetadataImpl extractSegmentMetadata(String str, String str2) {
        String join = StringUtil.join("/", new String[]{this._controllerConf.getDataDir(), str});
        String join2 = StringUtil.join("/", new String[]{join, str2});
        String join3 = StringUtil.join("/", new String[]{join, str2 + METADATA_TEMP_DIR_SUFFIX});
        File file = new File(join3);
        try {
            try {
                Preconditions.checkState(file.mkdirs(), "Failed to create directory: %s", join3);
                InputStream unTarOneFile = TarGzCompressionUtils.unTarOneFile(new FileInputStream(new File(join2)), "metadata.properties");
                Preconditions.checkNotNull(unTarOneFile, "%s does not exist", "metadata.properties");
                Files.copy(unTarOneFile, FileSystems.getDefault().getPath(join3, "metadata.properties"), new CopyOption[0]);
                InputStream unTarOneFile2 = TarGzCompressionUtils.unTarOneFile(new FileInputStream(new File(join2)), "creation.meta");
                Preconditions.checkNotNull(unTarOneFile2, "%s does not exist", "creation.meta");
                Files.copy(unTarOneFile2, FileSystems.getDefault().getPath(join3, "creation.meta"), new CopyOption[0]);
                SegmentMetadataImpl segmentMetadataImpl = new SegmentMetadataImpl(file);
                FileUtils.deleteQuietly(file);
                return segmentMetadataImpl;
            } catch (Exception e) {
                throw new RuntimeException("Exception extracting and reading segment metadata for " + str2, e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String str, String str2, Stat stat) {
        ZNRecord zNRecord = (ZNRecord) this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(str, str2), stat, AccessOption.PERSISTENT);
        if (zNRecord != null) {
            return new LLCRealtimeSegmentZKMetadata(zNRecord);
        }
        LOGGER.error("Segment metadata not found for table {}, segment {}. (can happen during table drop)", str, str2);
        throw new RuntimeException("Segment metadata not found for table " + str + " segment " + str2);
    }

    protected long getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int i) {
        return fetchPartitionOffset(streamConfig, offsetCriteria, i);
    }

    private long fetchPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int i) {
        PartitionOffsetFetcher partitionOffsetFetcher = new PartitionOffsetFetcher(offsetCriteria, i, streamConfig);
        try {
            RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
            return partitionOffsetFetcher.getOffset();
        } catch (Exception e) {
            Exception exception = partitionOffsetFetcher.getException();
            LOGGER.error("Could not get offset for topic {} partition {}, criteria {}", new Object[]{streamConfig.getTopicName(), Integer.valueOf(i), offsetCriteria, exception});
            throw new RuntimeException(exception);
        }
    }

    private long getBetterStartOffsetIfNeeded(String str, int i, LLCSegmentName lLCSegmentName, long j, int i2) {
        LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = getRealtimeSegmentZKMetadata(str, lLCSegmentName.getSegmentName(), null);
        CommonConstants.Segment.Realtime.Status status = realtimeSegmentZKMetadata.getStatus();
        long j2 = j;
        long startOffset = realtimeSegmentZKMetadata.getStartOffset();
        if (!status.equals(CommonConstants.Segment.Realtime.Status.IN_PROGRESS)) {
            long endOffset = realtimeSegmentZKMetadata.getEndOffset();
            if (j < endOffset) {
                j2 = endOffset;
                LOGGER.info("Choosing newer offset {} for table {} for partition {}, sequence {}", new Object[]{Long.valueOf(j), str, Integer.valueOf(i), Integer.valueOf(i2)});
            } else if (j > endOffset) {
                LOGGER.warn("Data lost from offset {} to {} for table {} partition {} sequence {}", new Object[]{Long.valueOf(endOffset), Long.valueOf(j), str, Integer.valueOf(i), Integer.valueOf(i2)});
                this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_KAFKA_DATA_LOSS, 1L);
                this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
            } else {
                LOGGER.info("Earliest offset {} is the same as new segment start offset", Long.valueOf(j));
            }
        } else if (j <= startOffset) {
            j2 = startOffset;
            LOGGER.info("Choosing previous segment start offset {} for table {} for partition {}, sequence {}", new Object[]{Long.valueOf(j), str, Integer.valueOf(i), Integer.valueOf(i2)});
        } else {
            LOGGER.warn("Data lost from offset {} to {} for table {} partition {} sequence {}", new Object[]{Long.valueOf(startOffset), Long.valueOf(j), str, Integer.valueOf(i), Integer.valueOf(i2)});
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_KAFKA_DATA_LOSS, 1L);
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
        }
        return j2;
    }

    private ZNRecord makeZnRecordForNewSegment(String str, int i, long j, LLCSegmentName lLCSegmentName, int i2) {
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = new LLCRealtimeSegmentZKMetadata();
        lLCRealtimeSegmentZKMetadata.setCreationTime(System.currentTimeMillis());
        lLCRealtimeSegmentZKMetadata.setStartOffset(j);
        lLCRealtimeSegmentZKMetadata.setEndOffset(END_OFFSET_FOR_CONSUMING_SEGMENTS);
        lLCRealtimeSegmentZKMetadata.setNumReplicas(i);
        lLCRealtimeSegmentZKMetadata.setTableName(str);
        lLCRealtimeSegmentZKMetadata.setSegmentName(lLCSegmentName.getSegmentName());
        lLCRealtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentPartitionMetadata partitionMetadataFromTableConfig = getPartitionMetadataFromTableConfig(str, i2, lLCSegmentName.getPartitionId());
        if (partitionMetadataFromTableConfig != null) {
            lLCRealtimeSegmentZKMetadata.setPartitionMetadata(partitionMetadataFromTableConfig);
        }
        return lLCRealtimeSegmentZKMetadata.toZNRecord();
    }

    public void segmentStoppedConsuming(LLCSegmentName lLCSegmentName, String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName());
        String segmentName = lLCSegmentName.getSegmentName();
        try {
            HelixHelper.updateIdealState(this._helixManager, tableNameWithType, idealState -> {
                idealState.setPartitionState(segmentName, str, "OFFLINE");
                LOGGER.info("Attempting to mark {} offline. Current map:{}", segmentName, idealState.getInstanceStateMap(segmentName).toString());
                return idealState;
            }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2000000476837158d));
            LOGGER.info("Successfully marked {} offline for instance {} since it stopped consuming", segmentName, str);
        } catch (Exception e) {
            LOGGER.error("Failed to update idealstate for table {} instance {} segment {}", new Object[]{tableNameWithType, str, segmentName, e});
            this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    private void notifyOnSegmentFlush(String str) {
        MetadataEventNotifierFactory loadFactory = MetadataEventNotifierFactory.loadFactory(this._controllerConf.subset(METADATA_EVENT_NOTIFIER_PREFIX));
        loadFactory.create().notifyOnSegmentFlush(getRealtimeTableConfig(str));
    }

    protected int getPartitionCount(StreamConfig streamConfig) {
        return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
    }

    @VisibleForTesting
    protected List<String> getAllSegments(String str) {
        return ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, str);
    }

    @VisibleForTesting
    protected LLCRealtimeSegmentZKMetadata getSegmentMetadata(String str, String str2) {
        return ZKMetadataProvider.getRealtimeSegmentZKMetadata(this._propertyStore, str, str2);
    }

    @VisibleForTesting
    protected Map<Integer, LLCRealtimeSegmentZKMetadata[]> getLatestMetadata(String str) {
        List<String> allSegments = getAllSegments(str);
        HashMap hashMap = new HashMap();
        Iterator<String> it = allSegments.iterator();
        while (it.hasNext()) {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(it.next());
            hashMap.compute(Integer.valueOf(lLCSegmentName.getPartitionId()), (num, lLCSegmentNameArr) -> {
                if (lLCSegmentNameArr == null) {
                    return new LLCSegmentName[]{lLCSegmentName, null};
                }
                if (lLCSegmentName.getSequenceNumber() > lLCSegmentNameArr[0].getSequenceNumber()) {
                    lLCSegmentNameArr[1] = lLCSegmentNameArr[0];
                    lLCSegmentNameArr[0] = lLCSegmentName;
                } else if (lLCSegmentNameArr[1] == null || lLCSegmentName.getSequenceNumber() > lLCSegmentNameArr[1].getSequenceNumber()) {
                    lLCSegmentNameArr[1] = lLCSegmentName;
                }
                return lLCSegmentNameArr;
            });
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            LLCSegmentName[] lLCSegmentNameArr2 = (LLCSegmentName[]) entry.getValue();
            LLCRealtimeSegmentZKMetadata segmentMetadata = getSegmentMetadata(str, lLCSegmentNameArr2[0].getSegmentName());
            LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = null;
            if (lLCSegmentNameArr2[1] != null) {
                lLCRealtimeSegmentZKMetadata = getSegmentMetadata(str, lLCSegmentNameArr2[1].getSegmentName());
            }
            hashMap2.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata[]{segmentMetadata, lLCRealtimeSegmentZKMetadata});
        }
        return hashMap2;
    }

    public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
        String tableName = tableConfig.getTableName();
        final int partitionCount = getPartitionCount(new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()));
        HelixHelper.updateIdealState(this._helixManager, tableName, new Function<IdealState, IdealState>() { // from class: org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.1
            @Nullable
            public IdealState apply(@Nullable IdealState idealState) {
                return PinotLLCRealtimeSegmentManager.this.ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount);
            }
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d), true);
    }

    @VisibleForTesting
    protected void updateIdealStateOnSegmentCompletion(@Nonnull String str, @Nonnull final String str2, @Nonnull final String str3, @Nonnull final PartitionAssignment partitionAssignment) {
        HelixHelper.updateIdealState(this._helixManager, str, new Function<IdealState, IdealState>() { // from class: org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.2
            @Nullable
            public IdealState apply(@Nullable IdealState idealState) {
                return PinotLLCRealtimeSegmentManager.this.updateIdealStateOnSegmentCompletion(idealState, str2, str3, partitionAssignment);
            }
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d));
    }

    private IdealState setupTable(TableConfig tableConfig, IdealState idealState, int i) throws InvalidConfigException {
        String tableName = tableConfig.getTableName();
        if (!idealState.isEnabled()) {
            LOGGER.info("Skipping validation for disabled table {}", tableName);
            return idealState;
        }
        StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
        long currentTimeMs = getCurrentTimeMs();
        PartitionAssignment generateStreamPartitionAssignment = this._streamPartitionAssignmentGenerator.generateStreamPartitionAssignment(tableConfig, i);
        HashSet hashSet = new HashSet(i);
        for (int i2 = 0; i2 < i; i2++) {
            hashSet.add(Integer.valueOf(i2));
        }
        Set<String> set = setupNewPartitions(tableConfig, streamConfig, streamConfig.getOffsetCriteria(), generateStreamPartitionAssignment, hashSet, currentTimeMs);
        updateIdealState(idealState, null, set, new ConsumingSegmentAssignmentStrategy().assign(set, generateStreamPartitionAssignment));
        return idealState;
    }

    @VisibleForTesting
    protected boolean isTooSoonToCorrect(String str, String str2, long j) {
        Stat stat = new Stat();
        getRealtimeSegmentZKMetadata(str, str2, stat);
        long mtime = stat.getMtime();
        if (j >= mtime + TimeUnit.MILLISECONDS.convert(MAX_SEGMENT_COMPLETION_TIME_MINS, TimeUnit.MINUTES)) {
            return false;
        }
        LOGGER.info("Too soon to correct segment:{} updateTime: {} now:{}", new Object[]{str2, Long.valueOf(mtime), Long.valueOf(j)});
        return true;
    }

    private boolean isAllInstancesInState(Map<String, String> map, String str) {
        return map.values().stream().allMatch(str2 -> {
            return str2.equals(str);
        });
    }

    @VisibleForTesting
    protected IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, IdealState idealState, int i) {
        PartitionAssignment streamPartitionAssignmentFromIdealState;
        String tableName = tableConfig.getTableName();
        StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
        if (!idealState.isEnabled()) {
            LOGGER.info("Skipping validation for disabled table {}", tableName);
            return idealState;
        }
        long currentTimeMs = getCurrentTimeMs();
        Map<Integer, LLCRealtimeSegmentZKMetadata[]> latestMetadata = getLatestMetadata(tableName);
        HashSet hashSet = new HashSet(i);
        for (int i2 = 0; i2 < i; i2++) {
            if (!latestMetadata.containsKey(Integer.valueOf(i2))) {
                LOGGER.info("Found partition {} with no segments", Integer.valueOf(i2));
                hashSet.add(Integer.valueOf(i2));
            }
        }
        boolean z = false;
        try {
            streamPartitionAssignmentFromIdealState = this._streamPartitionAssignmentGenerator.generateStreamPartitionAssignment(tableConfig, i);
        } catch (InvalidConfigException e) {
            this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.PARTITION_ASSIGNMENT_GENERATION_ERROR, 1L);
            LOGGER.warn("Could not generate partition assignment. Fetching partition assignment from ideal state for repair of table {}", tableName);
            streamPartitionAssignmentFromIdealState = this._streamPartitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig, idealState);
            z = true;
        }
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata[]> entry : latestMetadata.entrySet()) {
            int intValue = entry.getKey().intValue();
            LLCRealtimeSegmentZKMetadata[] value = entry.getValue();
            LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = value[0];
            String segmentName = lLCRealtimeSegmentZKMetadata.getSegmentName();
            LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
            if (idealState.getRecord().getMapFields().containsKey(segmentName)) {
                Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
                if (instanceStateMap.values().contains("CONSUMING")) {
                    if (lLCRealtimeSegmentZKMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.DONE) && !isTooSoonToCorrect(tableName, segmentName, currentTimeMs)) {
                        LOGGER.info("{}:Repairing segment for partition {}. Old segment metadata {} has status DONE, but segments are still in CONSUMING state in ideal STATE", new Object[]{tableName, Integer.valueOf(intValue), segmentName});
                        LLCSegmentName makeNextLLCSegmentName = makeNextLLCSegmentName(lLCSegmentName, intValue, currentTimeMs);
                        LOGGER.info("{}: Creating new segment metadata for {}", tableName, makeNextLLCSegmentName.getSegmentName());
                        if (createNewSegmentMetadataZNRecord(tableConfig, lLCSegmentName, makeNextLLCSegmentName, streamPartitionAssignmentFromIdealState, new CommittingSegmentDescriptor(segmentName, lLCRealtimeSegmentZKMetadata.getEndOffset(), 0L), false)) {
                            hashSet2.add(segmentName);
                            hashSet3.add(makeNextLLCSegmentName.getSegmentName());
                        }
                    }
                } else if (isAllInstancesInState(instanceStateMap, "OFFLINE") || !isTooSoonToCorrect(tableName, segmentName, currentTimeMs)) {
                    LLCSegmentName makeNextLLCSegmentName2 = makeNextLLCSegmentName(lLCSegmentName, intValue, currentTimeMs);
                    LOGGER.info("Creating CONSUMING segment {} for {} partition {}", new Object[]{makeNextLLCSegmentName2.getSegmentName(), tableName, Integer.valueOf(intValue)});
                    long partitionOffset = getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, intValue);
                    LOGGER.info("Found smallest offset {} for table {} for partition {}", new Object[]{Long.valueOf(partitionOffset), tableName, Integer.valueOf(intValue)});
                    if (createNewSegmentMetadataZNRecord(tableConfig, lLCSegmentName, makeNextLLCSegmentName2, streamPartitionAssignmentFromIdealState, new CommittingSegmentDescriptor(segmentName, getBetterStartOffsetIfNeeded(tableName, intValue, lLCSegmentName, partitionOffset, makeNextLLCSegmentName2.getSequenceNumber()), 0L), false)) {
                        hashSet3.add(makeNextLLCSegmentName2.getSegmentName());
                    }
                }
            } else if (!isTooSoonToCorrect(tableName, segmentName, currentTimeMs)) {
                Preconditions.checkArgument(lLCRealtimeSegmentZKMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.IN_PROGRESS));
                LOGGER.info("{}:Repairing segment for partition {}. Segment {} not found in idealstate", new Object[]{tableName, Integer.valueOf(intValue), segmentName});
                LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata2 = value[1];
                if (lLCRealtimeSegmentZKMetadata2 != null || !z) {
                    if (lLCRealtimeSegmentZKMetadata2 != null) {
                        hashSet2.add(lLCRealtimeSegmentZKMetadata2.getSegmentName());
                    }
                    hashSet3.add(segmentName);
                }
            }
        }
        if (!z) {
            hashSet3.addAll(setupNewPartitions(tableConfig, streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, streamPartitionAssignmentFromIdealState, hashSet, currentTimeMs));
        }
        try {
            updateIdealState(idealState, hashSet2, hashSet3, new ConsumingSegmentAssignmentStrategy().assign(hashSet3, streamPartitionAssignmentFromIdealState));
            return idealState;
        } catch (InvalidConfigException e2) {
            throw new IllegalStateException("Caught exception when assigning segments using partition assignment for table " + tableName);
        }
    }

    private LLCSegmentName makeNextLLCSegmentName(LLCSegmentName lLCSegmentName, int i, long j) {
        return new LLCSegmentName(lLCSegmentName.getTableName(), i, lLCSegmentName.getSequenceNumber() + 1, j);
    }

    private void updateIdealState(IdealState idealState, Set<String> set, Set<String> set2, Map<String, List<String>> map) {
        if (set != null) {
            for (String str : set) {
                Set instanceSet = idealState.getInstanceSet(str);
                Preconditions.checkArgument(CollectionUtils.isNotEmpty(instanceSet));
                Iterator it = instanceSet.iterator();
                while (it.hasNext()) {
                    idealState.setPartitionState(str, (String) it.next(), "ONLINE");
                }
            }
        }
        if (set2 != null) {
            for (String str2 : set2) {
                List<String> list = map.get(str2);
                Map instanceStateMap = idealState.getInstanceStateMap(str2);
                if (instanceStateMap != null) {
                    instanceStateMap.clear();
                }
                Iterator<String> it2 = list.iterator();
                while (it2.hasNext()) {
                    idealState.setPartitionState(str2, it2.next(), "CONSUMING");
                }
            }
        }
    }

    private Set<String> setupNewPartitions(TableConfig tableConfig, StreamConfig streamConfig, OffsetCriteria offsetCriteria, PartitionAssignment partitionAssignment, Set<Integer> set, long j) {
        String tableName = tableConfig.getTableName();
        HashSet hashSet = new HashSet(set.size());
        String extractRawTableName = TableNameBuilder.extractRawTableName(tableName);
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            LOGGER.info("Creating CONSUMING segment for {} partition {} with seq {}", new Object[]{tableName, Integer.valueOf(intValue), 0});
            long partitionOffset = getPartitionOffset(streamConfig, offsetCriteria, intValue);
            LOGGER.info("Found offset {} for table {} for partition {}", new Object[]{Long.valueOf(partitionOffset), tableName, Integer.valueOf(intValue)});
            LLCSegmentName lLCSegmentName = new LLCSegmentName(extractRawTableName, intValue, 0, j);
            if (createNewSegmentMetadataZNRecord(tableConfig, null, lLCSegmentName, partitionAssignment, new CommittingSegmentDescriptor(null, partitionOffset, 0L), true)) {
                hashSet.add(lLCSegmentName.getSegmentName());
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    protected long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    protected IdealState updateIdealStateOnSegmentCompletion(@Nonnull IdealState idealState, @Nonnull String str, @Nonnull String str2, @Nonnull PartitionAssignment partitionAssignment) {
        try {
            List list = (List) new ConsumingSegmentAssignmentStrategy().assign(Lists.newArrayList(new String[]{str2}), partitionAssignment).get(str2);
            Iterator it = idealState.getInstanceSet(str).iterator();
            while (it.hasNext()) {
                idealState.setPartitionState(str, (String) it.next(), "ONLINE");
            }
            Map instanceStateMap = idealState.getInstanceStateMap(str2);
            if (instanceStateMap != null) {
                instanceStateMap.clear();
            }
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                idealState.setPartitionState(str2, (String) it2.next(), "CONSUMING");
            }
            return idealState;
        } catch (InvalidConfigException e) {
            this._controllerMetrics.addMeteredTableValue(idealState.getResourceName(), ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
            throw new IllegalStateException("Caught exception when updating ideal state on segment completion", e);
        }
    }
}
