package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/TaskStorageCommitManager.class */
public class TaskStorageCommitManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
    private final TaskName taskName;
    private final CheckpointManager checkpointManager;
    private final ContainerStorageManager containerStorageManager;
    private final Map<String, TaskBackupManager> stateBackendToBackupManager;
    private final Partition taskChangelogPartition;
    private final StorageManagerUtil storageManagerUtil;
    private final ExecutorService backupExecutor;
    private final File durableStoreBaseDir;
    private final Map<String, SystemStream> storeChangelogs;
    private final TaskInstanceMetrics metrics;
    private Map<String, StorageEngine> storageEngines;

    public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> map, ContainerStorageManager containerStorageManager, Map<String, SystemStream> map2, Partition partition, CheckpointManager checkpointManager, Config config, ExecutorService executorService, StorageManagerUtil storageManagerUtil, File file, TaskInstanceMetrics taskInstanceMetrics) {
        this.taskName = taskName;
        this.containerStorageManager = containerStorageManager;
        this.stateBackendToBackupManager = map;
        this.taskChangelogPartition = partition;
        this.checkpointManager = checkpointManager;
        this.backupExecutor = executorService;
        this.durableStoreBaseDir = file;
        this.storeChangelogs = map2;
        this.storageManagerUtil = storageManagerUtil;
        this.metrics = taskInstanceMetrics;
    }

    public void init() {
        this.storageEngines = this.containerStorageManager.getAllStores(this.taskName);
        if (this.checkpointManager == null) {
            this.stateBackendToBackupManager.values().forEach(taskBackupManager -> {
                taskBackupManager.init((Checkpoint) null);
            });
            return;
        }
        Checkpoint readLastCheckpoint = this.checkpointManager.readLastCheckpoint(this.taskName);
        LOG.debug("Last checkpoint on start for task: {} is: {}", this.taskName, readLastCheckpoint);
        this.stateBackendToBackupManager.values().forEach(taskBackupManager2 -> {
            taskBackupManager2.init(readLastCheckpoint);
        });
    }

    public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
        this.storageEngines.values().forEach((v0) -> {
            v0.flush();
        });
        LOG.debug("Flushed all storage engines for taskName: {}, checkpoint id: {}", this.taskName, checkpointId);
        long nanoTime = System.nanoTime();
        this.storageEngines.forEach((str, storageEngine) -> {
            if (storageEngine.getStoreProperties().isPersistedToDisk() && storageEngine.getStoreProperties().isDurableStore()) {
                storageEngine.checkpoint(checkpointId);
            }
        });
        long nanoTime2 = System.nanoTime() - nanoTime;
        this.metrics.storeCheckpointNs().update(nanoTime2);
        LOG.debug("Checkpointed all storage engines for taskName: {}, checkpoint id: {} in {} ns", new Object[]{this.taskName, checkpointId, Long.valueOf(nanoTime2)});
        HashMap hashMap = new HashMap();
        this.stateBackendToBackupManager.forEach((str2, taskBackupManager) -> {
            Map snapshot = taskBackupManager.snapshot(checkpointId);
            LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}", new Object[]{this.taskName, checkpointId, str2, snapshot});
            hashMap.put(str2, snapshot);
        });
        return hashMap;
    }

    public CompletableFuture<Map<String, Map<String, String>>> upload(CheckpointId checkpointId, Map<String, Map<String, String>> map) {
        HashMap hashMap = new HashMap();
        this.stateBackendToBackupManager.forEach((str, taskBackupManager) -> {
            try {
                Map map2 = (Map) map.getOrDefault(str, Collections.emptyMap());
                LOG.debug("Starting upload for taskName: {}, checkpoint id: {}, state backend snapshot SCM: {}", new Object[]{this.taskName, checkpointId, map2});
                CompletableFuture upload = taskBackupManager.upload(checkpointId, map2);
                upload.thenAccept(map3 -> {
                    LOG.debug("Finished upload for taskName: {}, checkpoint id: {}, state backend: {}. Upload SCMs: {}", new Object[]{this.taskName, checkpointId, str, map3});
                });
                hashMap.put(str, upload);
            } catch (Exception e) {
                throw new SamzaException(String.format("Error backing up local state for taskName: %s, checkpoint id: %s, state backend: %s", this.taskName, checkpointId, str), e);
            }
        });
        return FutureUtil.toFutureOfMap(hashMap);
    }

    public void writeCheckpointToStoreDirectories(Checkpoint checkpoint) {
        if (checkpoint instanceof CheckpointV1) {
            LOG.debug("Writing CheckpointV1 to store and checkpoint directories for taskName: {} with checkpoint: {}", this.taskName, checkpoint);
            writeChangelogOffsetFiles(checkpoint.getOffsets());
        } else {
            if (!(checkpoint instanceof CheckpointV2)) {
                throw new SamzaException("Unsupported checkpoint version: " + ((int) checkpoint.getVersion()));
            }
            LOG.debug("Writing CheckpointV2 to store and checkpoint directories for taskName: {} with checkpoint: {}", this.taskName, checkpoint);
            this.storageEngines.forEach((str, storageEngine) -> {
                if (storageEngine.getStoreProperties().isDurableStore() && storageEngine.getStoreProperties().isPersistedToDisk()) {
                    CheckpointV2 checkpointV2 = (CheckpointV2) checkpoint;
                    try {
                        File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(this.durableStoreBaseDir, str, this.taskName, TaskMode.Active);
                        this.storageManagerUtil.writeCheckpointV2File(taskStoreDir, checkpointV2);
                        this.storageManagerUtil.writeCheckpointV2File(Paths.get(this.storageManagerUtil.getStoreCheckpointDir(taskStoreDir, checkpointV2.getCheckpointId()), new String[0]).toFile(), checkpointV2);
                    } catch (Exception e) {
                        throw new SamzaException(String.format("Write checkpoint file failed for task: %s, storeName: %s, checkpointId: %s", this.taskName, str, ((CheckpointV2) checkpoint).getCheckpointId()), e);
                    }
                }
            });
        }
    }

    public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, Map<String, String>> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((str, map2) -> {
            if (!this.stateBackendToBackupManager.containsKey(str)) {
                LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", map2, str);
            } else {
                LOG.debug("Cleaning up commit for factory: {} for task: {}", str, this.taskName);
                arrayList.add(this.stateBackendToBackupManager.get(str).cleanUp(checkpointId, map2));
            }
        });
        return FutureUtil.allOf(arrayList).thenAcceptAsync(r5 -> {
            deleteOldCheckpointDirs(checkpointId);
        }, (Executor) this.backupExecutor);
    }

    private void deleteOldCheckpointDirs(CheckpointId checkpointId) {
        if (checkpointId != null) {
            LOG.debug("Deleting checkpoints older than checkpoint id: {}", checkpointId);
            File[] listFiles = this.durableStoreBaseDir.listFiles();
            if (listFiles != null) {
                for (File file : listFiles) {
                    File[] listFiles2 = file.listFiles((FileFilter) new WildcardFileFilter(this.storageManagerUtil.getTaskStoreDir(this.durableStoreBaseDir, file.getName(), this.taskName, TaskMode.Active).getName() + "-*"));
                    if (listFiles2 != null) {
                        for (File file2 : listFiles2) {
                            if (!file2.getName().contains(checkpointId.serialize())) {
                                try {
                                    FileUtils.deleteDirectory(file2);
                                } catch (IOException e) {
                                    throw new SamzaException(String.format("Unable to delete checkpoint directory: %s", file2.getName()), e);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    public void close() {
        LOG.debug("Stopping backup managers for task {}.", this.taskName);
        this.stateBackendToBackupManager.values().forEach(taskBackupManager -> {
            if (taskBackupManager != null) {
                taskBackupManager.close();
            }
        });
    }

    @VisibleForTesting
    void writeChangelogOffsetFiles(Map<SystemStreamPartition, String> map) {
        if (this.storageEngines == null) {
            throw new SamzaException(String.format("Storage engines are not initialized and writeChangelogOffsetFiles not be written for task %s", this.taskName));
        }
        this.storeChangelogs.forEach((str, systemStream) -> {
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemStream.getSystem(), systemStream.getStream(), this.taskChangelogPartition);
            if (map.containsKey(systemStreamPartition) && this.storageEngines.containsKey(str) && this.storageEngines.get(str).getStoreProperties().isDurableStore() && this.storageEngines.get(str).getStoreProperties().isPersistedToDisk()) {
                LOG.debug("Writing changelog offset for taskName {} store {} changelog {}.", new Object[]{this.taskName, str, systemStream});
                File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(this.durableStoreBaseDir, str, this.taskName, TaskMode.Active);
                try {
                    KafkaChangelogSSPOffset fromString = KafkaChangelogSSPOffset.fromString((String) map.get(systemStreamPartition));
                    String changelogOffset = fromString.getChangelogOffset();
                    if (changelogOffset != null) {
                        writeChangelogOffsetFile(str, systemStreamPartition, changelogOffset, taskStoreDir);
                        writeChangelogOffsetFile(str, systemStreamPartition, changelogOffset, Paths.get(this.storageManagerUtil.getStoreCheckpointDir(taskStoreDir, fromString.getCheckpointId()), new String[0]).toFile());
                    } else {
                        LOG.debug("Deleting OFFSET file for taskName {} store {} changelog ssp {} since the newestOffset is null.", new Object[]{this.taskName, str, systemStreamPartition});
                        this.storageManagerUtil.deleteOffsetFile(taskStoreDir);
                    }
                } catch (IOException e) {
                    throw new SamzaException(String.format("Error storing offset for taskName %s store %s changelog %s.", this.taskName, str, systemStream), e);
                }
            }
        });
        LOG.debug("Done writing OFFSET files for logged persistent key value stores for task {}", this.taskName);
    }

    @VisibleForTesting
    void writeChangelogOffsetFile(String str, SystemStreamPartition systemStreamPartition, String str2, File file) throws IOException {
        LOG.debug("Storing newest offset {} for taskName {} store {} changelog ssp {} in OFFSET file at path: {}.", new Object[]{str2, this.taskName, str, systemStreamPartition, file});
        this.storageManagerUtil.writeOffsetFile(file, Collections.singletonMap(systemStreamPartition, str2), false);
    }
}
