/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.retention;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetentionManager
extends ControllerPeriodicTask<Void> {
    public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(5L);
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)2.0);
    private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);

    public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
        super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", (Object)this.getIntervalInSeconds());
    }

    @Override
    protected void processTable(String tableNameWithType) {
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
        if (tableConfig == null) {
            LOGGER.error("Failed to get table config for table: {}", (Object)tableNameWithType);
            return;
        }
        this.manageRetentionForTable(tableConfig);
        this.manageSegmentLineageCleanupForTable(tableConfig);
    }

    @Override
    protected void postprocess() {
        LOGGER.info("Removing aged deleted segments for all tables");
        this._pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(this._leadControllerManager);
    }

    private void manageRetentionForTable(TableConfig tableConfig) {
        TimeRetentionStrategy retentionStrategy;
        String tableNameWithType = tableConfig.getTableName();
        LOGGER.info("Start managing retention for table: {}", (Object)tableNameWithType);
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        String segmentPushType = IngestionConfigUtils.getBatchSegmentIngestionType((TableConfig)tableConfig);
        if (tableConfig.getTableType() == TableType.OFFLINE && !"APPEND".equalsIgnoreCase(segmentPushType)) {
            LOGGER.info("Segment push type is not APPEND for table: {}, skip managing retention", (Object)tableNameWithType);
            return;
        }
        String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
        String retentionTimeValue = validationConfig.getRetentionTimeValue();
        try {
            retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()), Long.parseLong(retentionTimeValue));
        }
        catch (Exception e) {
            LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", new Object[]{retentionTimeUnit, retentionTimeValue, tableNameWithType});
            return;
        }
        if (TableNameBuilder.isOfflineTableResource((String)tableNameWithType)) {
            this.manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
        } else {
            this.manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
        }
    }

    private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
        ArrayList<String> segmentsToDelete = new ArrayList<String>();
        for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
            if (!retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) continue;
            segmentsToDelete.add(segmentZKMetadata.getSegmentName());
        }
        if (!segmentsToDelete.isEmpty()) {
            LOGGER.info("Deleting {} segments from table: {}", (Object)segmentsToDelete.size(), (Object)offlineTableName);
            this._pinotHelixResourceManager.deleteSegments(offlineTableName, segmentsToDelete);
        }
    }

    private void manageRetentionForRealtimeTable(String realtimeTableName, RetentionStrategy retentionStrategy) {
        ArrayList<String> segmentsToDelete = new ArrayList<String>();
        IdealState idealState = this._pinotHelixResourceManager.getHelixAdmin().getResourceIdealState(this._pinotHelixResourceManager.getHelixClusterName(), realtimeTableName);
        for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
            String segmentName = segmentZKMetadata.getSegmentName();
            if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                if (!this.shouldDeleteInProgressLLCSegment(segmentName, idealState, segmentZKMetadata)) continue;
                segmentsToDelete.add(segmentName);
                continue;
            }
            if (!retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) continue;
            segmentsToDelete.add(segmentName);
        }
        segmentsToDelete.removeAll(this._pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName));
        if (!segmentsToDelete.isEmpty()) {
            LOGGER.info("Deleting {} segments from table: {}", (Object)segmentsToDelete.size(), (Object)realtimeTableName);
            this._pinotHelixResourceManager.deleteSegments(realtimeTableName, segmentsToDelete);
        }
    }

    private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState idealState, SegmentZKMetadata segmentZKMetadata) {
        if (idealState == null) {
            return false;
        }
        if (System.currentTimeMillis() - segmentZKMetadata.getCreationTime() <= OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS) {
            return false;
        }
        Map stateMap = idealState.getInstanceStateMap(segmentName);
        if (stateMap == null) {
            return true;
        }
        HashSet states = new HashSet(stateMap.values());
        return states.size() == 1 && states.contains("OFFLINE");
    }

    private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
        String tableNameWithType = tableConfig.getTableName();
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._pinotHelixResourceManager.getPropertyStore(), (String)tableNameWithType);
                if (segmentLineageZNRecord == null) {
                    return true;
                }
                LOGGER.info("Start cleaning up segment lineage for table: {}", (Object)tableNameWithType);
                long cleanupStartTime = System.currentTimeMillis();
                SegmentLineage segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                int expectedVersion = segmentLineageZNRecord.getVersion();
                List<String> segmentsForTable = this._pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false);
                ArrayList<String> segmentsToDelete = new ArrayList<String>();
                this._pinotHelixResourceManager.getLineageManager().updateLineageForRetention(tableConfig, segmentLineage, segmentsForTable, segmentsToDelete, this._pinotHelixResourceManager.getConsumingSegments(tableNameWithType));
                if (SegmentLineageAccessHelper.writeSegmentLineage(this._pinotHelixResourceManager.getPropertyStore(), (SegmentLineage)segmentLineage, (int)expectedVersion)) {
                    if (TableNameBuilder.isRealtimeTableResource((String)tableNameWithType)) {
                        segmentsToDelete.removeAll(this._pinotHelixResourceManager.getLastLLCCompletedSegments(tableNameWithType));
                    }
                    if (!segmentsToDelete.isEmpty()) {
                        this._pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
                        LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms, deleted segments: {}", new Object[]{tableNameWithType, System.currentTimeMillis() - cleanupStartTime, segmentsToDelete});
                    }
                    return true;
                }
                LOGGER.warn("Failed to write segment lineage back when cleaning up segment lineage for table: {}", (Object)tableNameWithType);
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", (Object)tableNameWithType);
    }
}

