package org.apache.samza.storage.blobstore;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.TaskBackupManager;
import org.apache.samza.storage.blobstore.diff.DirDiff;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
import org.apache.samza.storage.blobstore.util.DirDiffUtil;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/blobstore/BlobStoreBackupManager.class */
public class BlobStoreBackupManager implements TaskBackupManager {
    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class);
    private final JobModel jobModel;
    private final ExecutorService executor;
    private final String jobName;
    private final String jobId;
    private final ContainerModel containerModel;
    private final TaskModel taskModel;
    private final String taskName;
    private final Config config;
    private final Clock clock;
    private final StorageManagerUtil storageManagerUtil;
    private final List<String> storesToBackup;
    private final File loggedStoreBaseDir;
    private final BlobStoreManager blobStoreManager;
    private final BlobStoreUtil blobStoreUtil;
    private final BlobStoreBackupManagerMetrics metrics;
    private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>> prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of());

    public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, ExecutorService executorService, BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics, Config config, Clock clock, File file, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
        this.jobModel = jobModel;
        this.jobName = new JobConfig(config).getName().get();
        this.jobId = new JobConfig(config).getJobId();
        this.containerModel = containerModel;
        this.taskModel = taskModel;
        this.taskName = taskModel.getTaskName().getTaskName();
        this.executor = executorService;
        this.config = config;
        this.clock = clock;
        this.storageManagerUtil = storageManagerUtil;
        this.storesToBackup = new StorageConfig(config).getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
        this.loggedStoreBaseDir = file;
        this.blobStoreManager = blobStoreManager;
        this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, this.executor, blobStoreBackupManagerMetrics);
        this.metrics = blobStoreBackupManagerMetrics;
        this.metrics.initStoreMetrics(this.storesToBackup);
    }

    public void init(Checkpoint checkpoint) {
        long nanoTime = System.nanoTime();
        LOG.debug("Initializing blob store backup manager for task: {}", this.taskName);
        this.blobStoreManager.init();
        this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.copyOf(this.blobStoreUtil.getStoreSnapshotIndexes(this.jobName, this.jobId, this.taskName, checkpoint, new HashSet(this.storesToBackup))));
        this.metrics.initNs.set(Long.valueOf(System.nanoTime() - nanoTime));
    }

    public Map<String, String> snapshot(CheckpointId checkpointId) {
        return Collections.emptyMap();
    }

    public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> map) {
        long nanoTime = System.nanoTime();
        ((AtomicLong) this.metrics.filesToUpload.getValue()).set(0L);
        ((AtomicLong) this.metrics.bytesToUpload.getValue()).set(0L);
        ((AtomicLong) this.metrics.filesUploaded.getValue()).set(0L);
        ((AtomicLong) this.metrics.bytesUploaded.getValue()).set(0L);
        ((AtomicLong) this.metrics.filesRemaining.getValue()).set(0L);
        ((AtomicLong) this.metrics.bytesRemaining.getValue()).set(0L);
        ((AtomicLong) this.metrics.filesToRetain.getValue()).set(0L);
        ((AtomicLong) this.metrics.bytesToRetain.getValue()).set(0L);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        this.storesToBackup.forEach(str -> {
            long nanoTime2 = System.nanoTime();
            try {
                SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, this.jobName, this.jobId, this.taskName, str);
                File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDir, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode());
                File file = new File(this.storageManagerUtil.getStoreCheckpointDir(taskStoreDir, checkpointId));
                LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}", new Object[]{this.taskName, str, taskStoreDir, file});
                Map<String, Pair<String, SnapshotIndex>> map2 = this.prevStoreSnapshotIndexesFuture.get(0L, TimeUnit.MILLISECONDS);
                DirIndex dirIndex = map2.containsKey(str) ? ((SnapshotIndex) map2.get(str).getRight()).getDirIndex() : new DirIndex(file.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
                long nanoTime3 = System.nanoTime();
                DirDiff dirDiff = DirDiffUtil.getDirDiff(file, dirIndex, DirDiffUtil.areSameFile(false));
                this.metrics.storeDirDiffNs.get(str).update(System.nanoTime() - nanoTime3);
                DirDiff.Stats stats = DirDiff.getStats(dirDiff);
                updateStoreDiffMetrics(str, stats);
                ((AtomicLong) this.metrics.filesToUpload.getValue()).addAndGet(stats.filesAdded);
                ((AtomicLong) this.metrics.bytesToUpload.getValue()).addAndGet(stats.bytesAdded);
                ((AtomicLong) this.metrics.filesRemaining.getValue()).addAndGet(stats.filesAdded);
                ((AtomicLong) this.metrics.bytesRemaining.getValue()).addAndGet(stats.bytesAdded);
                ((AtomicLong) this.metrics.filesToRetain.getValue()).addAndGet(stats.filesRetained);
                ((AtomicLong) this.metrics.bytesToRetain.getValue()).addAndGet(stats.bytesRetained);
                CompletionStage<U> thenApplyAsync = this.blobStoreUtil.putDir(dirDiff, snapshotMetadata).thenApplyAsync(dirIndex2 -> {
                    LOG.trace("Dir upload complete. Returning new SnapshotIndex for task: {} store: {}.", this.taskName, str);
                    return new SnapshotIndex(this.clock.currentTimeMillis(), snapshotMetadata, dirIndex2, Optional.ofNullable(map2.get(str)).map((v0) -> {
                        return v0.getLeft();
                    }));
                }, this.executor);
                CompletionStage thenComposeAsync = thenApplyAsync.thenComposeAsync(snapshotIndex -> {
                    LOG.info("Uploading Snapshot index: {} for task: {} store: {}", new Object[]{snapshotIndex, this.taskName, str});
                    return this.blobStoreUtil.putSnapshotIndex(snapshotIndex);
                }, this.executor);
                CompletableFuture futureOfPair = FutureUtil.toFutureOfPair(Pair.of(thenComposeAsync.toCompletableFuture(), thenApplyAsync.toCompletableFuture()));
                futureOfPair.whenComplete((pair, th) -> {
                    this.metrics.storeUploadNs.get(str).update(System.nanoTime() - nanoTime2);
                });
                hashMap.put(str, futureOfPair);
                hashMap2.put(str, thenComposeAsync.toCompletableFuture());
            } catch (Exception e) {
                throw new SamzaException(String.format("Error uploading store snapshot to blob store for task: %s, store: %s, checkpointId: %s", this.taskName, str, checkpointId), e);
            }
        });
        this.prevStoreSnapshotIndexesFuture = FutureUtil.toFutureOfMap(hashMap);
        return FutureUtil.toFutureOfMap(hashMap2).whenComplete((map2, th) -> {
            this.metrics.uploadNs.update(System.nanoTime() - nanoTime);
        });
    }

    public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, String> map) {
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        map.forEach((str, str2) -> {
            if (this.storesToBackup.contains(str)) {
                Metadata metadata = new Metadata("snapshot-index", Optional.empty(), this.jobName, this.jobId, this.taskName, str);
                CompletableFuture<SnapshotIndex> snapshotIndex = this.blobStoreUtil.getSnapshotIndex(str2, metadata);
                arrayList.add(snapshotIndex.thenComposeAsync(snapshotIndex2 -> {
                    LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", new Object[]{str2, this.taskName, str});
                    return this.blobStoreUtil.removeTTL(str2, snapshotIndex2, metadata);
                }, (Executor) this.executor));
                arrayList2.add(snapshotIndex.thenComposeAsync(snapshotIndex3 -> {
                    LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", new Object[]{str2, this.taskName, str});
                    return this.blobStoreUtil.cleanUpDir(snapshotIndex3.getDirIndex(), metadata);
                }, (Executor) this.executor));
                arrayList3.add(snapshotIndex.thenComposeAsync(snapshotIndex4 -> {
                    if (!snapshotIndex4.getPrevSnapshotIndexBlobId().isPresent()) {
                        return CompletableFuture.completedFuture(null);
                    }
                    String str = snapshotIndex4.getPrevSnapshotIndexBlobId().get();
                    LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", new Object[]{str, this.taskName, str});
                    return this.blobStoreUtil.deleteSnapshotIndexBlob(str, metadata);
                }, (Executor) this.executor));
            }
        });
        return FutureUtil.allOf(arrayList, arrayList2, arrayList3).whenComplete((r9, th) -> {
            this.metrics.cleanupNs.update(System.nanoTime() - nanoTime);
        });
    }

    public void close() {
        this.blobStoreManager.close();
    }

    @VisibleForTesting
    protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executorService, BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics) {
        return new BlobStoreUtil(blobStoreManager, executorService, blobStoreBackupManagerMetrics, null);
    }

    private void updateStoreDiffMetrics(String str, DirDiff.Stats stats) {
        this.metrics.storeFilesToUpload.get(str).set(Long.valueOf(stats.filesAdded));
        this.metrics.storeFilesToRetain.get(str).set(Long.valueOf(stats.filesRetained));
        this.metrics.storeFilesToRemove.get(str).set(Long.valueOf(stats.filesRemoved));
        this.metrics.storeSubDirsToUpload.get(str).set(Long.valueOf(stats.subDirsAdded));
        this.metrics.storeSubDirsToRetain.get(str).set(Long.valueOf(stats.subDirsRetained));
        this.metrics.storeSubDirsToRemove.get(str).set(Long.valueOf(stats.subDirsRemoved));
        this.metrics.storeBytesToUpload.get(str).set(Long.valueOf(stats.bytesAdded));
        this.metrics.storeBytesToRetain.get(str).set(Long.valueOf(stats.bytesRetained));
        this.metrics.storeBytesToRemove.get(str).set(Long.valueOf(stats.bytesRemoved));
    }
}
