/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentDeletionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionManager.class);
    private static final long MAX_DELETION_DELAY_SECONDS = 300L;
    private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L;
    private final ScheduledExecutorService _executorService;
    private final String _dataDir;
    private final String _helixClusterName;
    private final HelixAdmin _helixAdmin;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final String DELETED_SEGMENTS = "Deleted_Segments";

    public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String helixClusterName, ZkHelixPropertyStore<ZNRecord> propertyStore) {
        this._dataDir = dataDir;
        this._helixAdmin = helixAdmin;
        this._helixClusterName = helixClusterName;
        this._propertyStore = propertyStore;
        this._executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName("PinotHelixResourceManagerExecutorService");
                return thread;
            }
        });
    }

    public void stop() {
        this._executorService.shutdownNow();
    }

    public void deleteSegments(String tableName, Collection<String> segmentIds) {
        this.deleteSegmentsWithDelay(tableName, segmentIds, 2L);
    }

    protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds, final long deletionDelaySeconds) {
        this._executorService.schedule(new Runnable(){

            @Override
            public void run() {
                SegmentDeletionManager.this.deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds);
            }
        }, deletionDelaySeconds, TimeUnit.SECONDS);
    }

    protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection<String> segmentIds, long deletionDelay) {
        ExternalView externalView = this._helixAdmin.getResourceExternalView(this._helixClusterName, tableName);
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableName);
        if (externalView == null || idealState == null) {
            LOGGER.warn("Resource: {} is not set up in idealState or ExternalView, won't do anything", (Object)tableName);
            return;
        }
        ArrayList<String> segmentsToDelete = new ArrayList<String>(segmentIds.size());
        HashSet<String> segmentsToRetryLater = new HashSet<String>(segmentIds.size());
        try {
            for (String string : segmentIds) {
                Map segmentToInstancesMapFromExternalView = externalView.getStateMap(string);
                Map segmentToInstancesMapFromIdealStates = idealState.getInstanceStateMap(string);
                if ((segmentToInstancesMapFromExternalView == null || segmentToInstancesMapFromExternalView.isEmpty()) && (segmentToInstancesMapFromIdealStates == null || segmentToInstancesMapFromIdealStates.isEmpty())) {
                    segmentsToDelete.add(string);
                    continue;
                }
                segmentsToRetryLater.add(string);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while checking helix states for table {} " + tableName, (Throwable)e);
            segmentsToDelete.clear();
            segmentsToDelete.addAll(segmentIds);
            segmentsToRetryLater.clear();
        }
        if (!segmentsToDelete.isEmpty()) {
            ArrayList<String> propStorePathList = new ArrayList<String>(segmentsToDelete.size());
            for (String segmentId : segmentsToDelete) {
                String segmentPropertyStorePath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableName, (String)segmentId);
                propStorePathList.add(segmentPropertyStorePath);
            }
            boolean[] blArray = this._propertyStore.remove(propStorePathList, AccessOption.PERSISTENT);
            ArrayList<String> propStoreFailedSegs = new ArrayList<String>(segmentsToDelete.size());
            for (int i = 0; i < blArray.length; ++i) {
                String segmentId = (String)segmentsToDelete.get(i);
                if (blArray[i] || !this._propertyStore.exists((String)propStorePathList.get(i), AccessOption.PERSISTENT)) continue;
                LOGGER.info("Could not delete {} from propertystore", propStorePathList.get(i));
                segmentsToRetryLater.add(segmentId);
                propStoreFailedSegs.add(segmentId);
            }
            segmentsToDelete.removeAll(propStoreFailedSegs);
            this.removeSegmentsFromStore(tableName, segmentsToDelete);
        }
        LOGGER.info("Deleted {} segments from table {}:{}", new Object[]{segmentsToDelete.size(), tableName, segmentsToDelete.size() <= 5 ? segmentsToDelete : ""});
        if (segmentsToRetryLater.size() > 0) {
            long effectiveDeletionDelay = Math.min(deletionDelay * 2L, 300L);
            LOGGER.info("Postponing deletion of {} segments from table {}", (Object)segmentsToRetryLater.size(), (Object)tableName);
            this.deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay);
            return;
        }
    }

    public void removeSegmentsFromStore(String tableNameWithType, List<String> segments) {
        for (String segment : segments) {
            this.removeSegmentFromStore(tableNameWithType, segment);
        }
    }

    protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
        block8: {
            if (SegmentName.isHighLevelConsumerSegmentName((String)segmentId)) {
                return;
            }
            if (this._dataDir != null) {
                String rawTableName = TableNameBuilder.extractRawTableName((String)tableNameWithType);
                URI fileToMoveURI = URIUtils.getUri((String)this._dataDir, (String[])new String[]{rawTableName, URIUtils.encode((String)segmentId)});
                URI deletedSegmentDestURI = URIUtils.getUri((String)this._dataDir, (String[])new String[]{"Deleted_Segments", rawTableName, URIUtils.encode((String)segmentId)});
                PinotFS pinotFS = PinotFSFactory.create((String)fileToMoveURI.getScheme());
                try {
                    if (pinotFS.exists(fileToMoveURI)) {
                        if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) {
                            pinotFS.touch(deletedSegmentDestURI);
                            LOGGER.info("Moved segment {} from {} to {}", new Object[]{segmentId, fileToMoveURI.toString(), deletedSegmentDestURI.toString()});
                        } else {
                            LOGGER.warn("Failed to move segment {} from {} to {}", new Object[]{segmentId, fileToMoveURI.toString(), deletedSegmentDestURI.toString()});
                        }
                        break block8;
                    }
                    LOGGER.warn("Failed to find local segment file for segment {}", (Object)fileToMoveURI.toString());
                }
                catch (IOException e) {
                    LOGGER.warn("Could not move segment {} from {} to {}", new Object[]{segmentId, fileToMoveURI.toString(), deletedSegmentDestURI.toString(), e});
                }
            } else {
                LOGGER.info("dataDir is not configured, won't delete segment {} from disk", (Object)segmentId);
            }
        }
    }

    public void removeAgedDeletedSegments(int retentionInDays) {
        if (this._dataDir != null) {
            URI deletedDirURI = URIUtils.getUri((String)this._dataDir, (String[])new String[]{"Deleted_Segments"});
            PinotFS pinotFS = PinotFSFactory.create((String)deletedDirURI.getScheme());
            try {
                if (!pinotFS.isDirectory(deletedDirURI)) {
                    LOGGER.warn("Deleted segment directory {} does not exist or it is not directory.", (Object)deletedDirURI.toString());
                    return;
                }
                String[] tableNameDirs = pinotFS.listFiles(deletedDirURI, false);
                if (tableNameDirs == null) {
                    LOGGER.warn("Deleted segment directory {} does not exist.", (Object)deletedDirURI.toString());
                    return;
                }
                for (String tableNameDir : tableNameDirs) {
                    URI tableNameURI = URIUtils.getUri((String)tableNameDir, (String[])new String[0]);
                    String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
                    int numFilesDeleted = 0;
                    for (String targetFile : targetFiles) {
                        URI targetURI = URIUtils.getUri((String)targetFile, (String[])new String[0]);
                        Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
                        if (pinotFS.lastModified(targetURI) >= dateToDelete.getTime()) continue;
                        if (!pinotFS.delete(targetURI, true)) {
                            LOGGER.warn("Cannot remove file {} from deleted directory.", (Object)targetURI.toString());
                            continue;
                        }
                        ++numFilesDeleted;
                    }
                    if (numFilesDeleted != targetFiles.length || pinotFS.delete(tableNameURI, false)) continue;
                    LOGGER.warn("The directory {} cannot be removed.", (Object)tableNameDir);
                }
            }
            catch (IOException e) {
                LOGGER.error("Had trouble deleting directories: {}", (Object)deletedDirURI.toString(), (Object)e);
            }
        } else {
            LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
        }
    }
}

