package org.apache.samza.storage.blobstore;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.config.BlobStoreConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
import org.apache.samza.storage.blobstore.util.DirDiffUtil;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/blobstore/BlobStoreRestoreManager.class */
public class BlobStoreRestoreManager implements TaskRestoreManager {
    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreRestoreManager.class);
    private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY, StorageManagerUtil.OFFSET_FILE_NAME_NEW, StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, StorageManagerUtil.CHECKPOINT_FILE_NAME);
    private final TaskModel taskModel;
    private final String jobName;
    private final String jobId;
    private final ExecutorService executor;
    private final Config config;
    private final StorageConfig storageConfig;
    private final BlobStoreConfig blobStoreConfig;
    private final StorageManagerUtil storageManagerUtil;
    private final BlobStoreUtil blobStoreUtil;
    private final File loggedBaseDir;
    private final File nonLoggedBaseDir;
    private final String taskName;
    private final Set<String> storesToRestore;
    private final BlobStoreRestoreManagerMetrics metrics;
    private BlobStoreManager blobStoreManager;
    private final DirDiffUtil dirDiffUtil = new DirDiffUtil();
    private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = new HashMap();

    public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService executorService, Set<String> set, BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics, Config config, File file, File file2, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
        this.taskModel = taskModel;
        this.jobName = new JobConfig(config).getName().get();
        this.jobId = new JobConfig(config).getJobId();
        this.executor = executorService;
        this.config = config;
        this.storageConfig = new StorageConfig(config);
        this.blobStoreConfig = new BlobStoreConfig(config);
        this.storageManagerUtil = storageManagerUtil;
        this.blobStoreManager = blobStoreManager;
        this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, this.executor, blobStoreRestoreManagerMetrics);
        this.loggedBaseDir = file;
        this.nonLoggedBaseDir = file2;
        this.taskName = taskModel.getTaskName().getTaskName();
        this.storesToRestore = set;
        this.metrics = blobStoreRestoreManagerMetrics;
    }

    public void init(Checkpoint checkpoint) {
        long nanoTime = System.nanoTime();
        LOG.debug("Initializing blob store restore manager for task: {}", this.taskName);
        this.blobStoreManager.init();
        this.prevStoreSnapshotIndexes = this.blobStoreUtil.getStoreSnapshotIndexes(this.jobName, this.jobId, this.taskName, checkpoint, this.storesToRestore);
        this.metrics.getSnapshotIndexNs.set(Long.valueOf(System.nanoTime() - nanoTime));
        LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}", this.taskName, this.prevStoreSnapshotIndexes);
        this.metrics.initStoreMetrics(this.storesToRestore);
        deleteUnusedStoresFromBlobStore(this.jobName, this.jobId, this.taskName, this.storageConfig, this.blobStoreConfig, this.prevStoreSnapshotIndexes, this.blobStoreUtil, this.executor);
        this.metrics.initNs.set(Long.valueOf(System.nanoTime() - nanoTime));
    }

    public CompletableFuture<Void> restore() {
        return restoreStores(this.jobName, this.jobId, this.taskModel.getTaskName(), this.storesToRestore, this.prevStoreSnapshotIndexes, this.loggedBaseDir, this.storageConfig, this.metrics, this.storageManagerUtil, this.blobStoreUtil, this.dirDiffUtil, this.executor);
    }

    public void close() {
        this.blobStoreManager.close();
    }

    @VisibleForTesting
    protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executorService, BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics) {
        return new BlobStoreUtil(blobStoreManager, executorService, null, blobStoreRestoreManagerMetrics);
    }

    @VisibleForTesting
    static void deleteUnusedStoresFromBlobStore(String str, String str2, String str3, StorageConfig storageConfig, BlobStoreConfig blobStoreConfig, Map<String, Pair<String, SnapshotIndex>> map, BlobStoreUtil blobStoreUtil, ExecutorService executorService) {
        List<String> storesWithBackupFactory = storageConfig.getStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
        List<String> storesWithRestoreFactory = storageConfig.getStoresWithRestoreFactory(BlobStoreStateBackendFactory.class.getName());
        ArrayList arrayList = new ArrayList();
        map.forEach((str4, pair) -> {
            if (storesWithBackupFactory.contains(str4) || storesWithRestoreFactory.contains(str4)) {
                return;
            }
            LOG.info("Removing task: {} store: {} from blob store. It is either no longer used, or is no longer configured to be backed up or restored with blob store.", str3, str4);
            DirIndex dirIndex = ((SnapshotIndex) pair.getRight()).getDirIndex();
            Metadata metadata = new Metadata("snapshot-index", Optional.empty(), str, str2, str3, str4);
            arrayList.add(blobStoreUtil.cleanUpDir(dirIndex, metadata).thenComposeAsync(r7 -> {
                return blobStoreUtil.deleteDir(dirIndex, metadata);
            }, executorService).thenComposeAsync(r72 -> {
                return blobStoreUtil.deleteSnapshotIndexBlob((String) pair.getLeft(), metadata);
            }, executorService));
        });
        FutureUtil.allOf(arrayList).join();
    }

    @VisibleForTesting
    static CompletableFuture<Void> restoreStores(String str, String str2, TaskName taskName, Set<String> set, Map<String, Pair<String, SnapshotIndex>> map, File file, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil, ExecutorService executorService) {
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        LOG.debug("Starting restore for task: {} stores: {}", taskName, set);
        set.forEach(str3 -> {
            if (!map.containsKey(str3)) {
                LOG.info("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, str3);
                return;
            }
            Pair pair = (Pair) map.get(str3);
            long nanoTime2 = System.nanoTime();
            SnapshotIndex snapshotIndex = (SnapshotIndex) pair.getRight();
            DirIndex dirIndex = snapshotIndex.getDirIndex();
            DirIndex.Stats stats = DirIndex.getStats(dirIndex);
            ((AtomicLong) blobStoreRestoreManagerMetrics.filesToRestore.getValue()).addAndGet(stats.filesPresent);
            ((AtomicLong) blobStoreRestoreManagerMetrics.bytesToRestore.getValue()).addAndGet(stats.bytesPresent);
            ((AtomicLong) blobStoreRestoreManagerMetrics.filesRemaining.getValue()).addAndGet(stats.filesPresent);
            ((AtomicLong) blobStoreRestoreManagerMetrics.bytesRemaining.getValue()).addAndGet(stats.bytesPresent);
            CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId();
            File taskStoreDir = storageManagerUtil.getTaskStoreDir(file, str3, taskName, TaskMode.Active);
            Path path = Paths.get(storageManagerUtil.getStoreCheckpointDir(taskStoreDir, checkpointId), new String[0]);
            LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}", new Object[]{taskName, str3, taskStoreDir, path});
            try {
                LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory or remote snapshot.", taskStoreDir);
                FileUtils.deleteDirectory(taskStoreDir);
                if (shouldRestore(taskName.getTaskName(), str3, dirIndex, path, storageConfig, dirDiffUtil)) {
                    deleteCheckpointDirs(taskName, str3, file, storageManagerUtil);
                    blobStoreRestoreManagerMetrics.storePreRestoreNs.get(str3).set(Long.valueOf(System.nanoTime() - nanoTime2));
                    enqueueRestore(str, str2, taskName.toString(), str3, taskStoreDir, dirIndex, nanoTime2, arrayList, blobStoreUtil, dirDiffUtil, blobStoreRestoreManagerMetrics, executorService);
                } else {
                    LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical to the remote snapshot.", path, taskStoreDir);
                    new FileUtil().move(path.toFile(), taskStoreDir);
                    deleteCheckpointDirs(taskName, str3, file, storageManagerUtil);
                }
            } catch (IOException e) {
                throw new SamzaException(String.format("Error deleting store directory: %s", taskStoreDir), e);
            }
        });
        return FutureUtil.allOf(arrayList).whenComplete((r10, th) -> {
            LOG.info("Restore completed for task: {} stores", taskName);
            blobStoreRestoreManagerMetrics.restoreNs.set(Long.valueOf(System.nanoTime() - nanoTime));
        });
    }

    @VisibleForTesting
    static boolean shouldRestore(String str, String str2, DirIndex dirIndex, Path path, StorageConfig storageConfig, DirDiffUtil dirDiffUtil) {
        boolean z;
        if (!Files.exists(path, new LinkOption[0])) {
            LOG.debug("No local store checkpoint directory found at: {}. Queuing for restore from remote snapshot.", path);
            z = true;
        } else if (storageConfig.cleanLoggedStoreDirsOnStart(str2)) {
            LOG.debug("Restoring task: {} store: {} from remote snapshot since the store is configured to be restored on each restart.", str, str2);
            z = true;
        } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false).test(path.toFile(), dirIndex)) {
            z = false;
        } else {
            LOG.error("Local store checkpoint directory: {} contents are not the same as the remote snapshot. Queuing for restore from remote snapshot.", path);
            z = true;
        }
        return z;
    }

    @VisibleForTesting
    static void enqueueRestore(String str, String str2, String str3, String str4, File file, DirIndex dirIndex, long j, List<CompletionStage<Void>> list, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics, ExecutorService executorService) {
        list.add(blobStoreUtil.restoreDir(file, dirIndex, new Metadata(file.getAbsolutePath(), Optional.empty(), str, str2, str3, str4)).thenRunAsync(() -> {
            blobStoreRestoreManagerMetrics.storeRestoreNs.get(str4).set(Long.valueOf(System.nanoTime() - j));
            long nanoTime = System.nanoTime();
            LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", file);
            if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(file, dirIndex)) {
                blobStoreRestoreManagerMetrics.storePostRestoreNs.get(str4).set(Long.valueOf(System.nanoTime() - nanoTime));
                throw new SamzaException(String.format("Restored store directory: %s contents are not the same as the remote snapshot.", file.getAbsolutePath()));
            }
            blobStoreRestoreManagerMetrics.storePostRestoreNs.get(str4).set(Long.valueOf(System.nanoTime() - nanoTime));
            LOG.info("Restore from remote snapshot completed for store: {}", file);
        }, (Executor) executorService));
    }

    private static void deleteCheckpointDirs(TaskName taskName, String str, File file, StorageManagerUtil storageManagerUtil) {
        try {
            for (File file2 : storageManagerUtil.getTaskStoreCheckpointDirs(file, str, taskName, TaskMode.Active)) {
                LOG.debug("Deleting local store checkpoint directory: {} before restore.", file2);
                FileUtils.deleteDirectory(file2);
            }
        } catch (Exception e) {
            throw new SamzaException(String.format("Error deleting checkpoint directory for task: %s store: %s.", taskName, str), e);
        }
    }
}
