package org.apache.samza.storage;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.system.ChangelogSSPIterator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.class */
class NonTransactionalStateTaskRestoreManager implements TaskRestoreManager {
    private static final Logger LOG = LoggerFactory.getLogger(NonTransactionalStateTaskRestoreManager.class);
    private final Map<String, StorageEngine> taskStores;
    private final Set<String> taskStoresToRestore;
    private final TaskModel taskModel;
    private final Clock clock;
    private Map<SystemStream, String> changeLogOldestOffsets;
    private final Map<String, SystemStream> storeChangelogs;
    private final SystemAdmins systemAdmins;
    private final File loggedStoreBaseDirectory;
    private final File nonLoggedStoreBaseDirectory;
    private final StreamMetadataCache streamMetadataCache;
    private final Map<String, SystemConsumer> storeConsumers;
    private final int maxChangeLogStreamPartitions;
    private final Config config;
    private final ExecutorService restoreExecutor;
    private final Map<SystemStreamPartition, String> fileOffsets = new HashMap();
    private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();

    /* JADX INFO: Access modifiers changed from: package-private */
    public NonTransactionalStateTaskRestoreManager(Set<String> set, JobContext jobContext, ContainerContext containerContext, TaskModel taskModel, ExecutorService executorService, Map<String, SystemStream> map, Map<String, StorageEngine> map2, Map<String, StorageEngineFactory<Object, Object>> map3, Map<String, Serde<Object>> map4, SystemAdmins systemAdmins, StreamMetadataCache streamMetadataCache, Map<String, SystemConsumer> map5, MetricsRegistry metricsRegistry, MessageCollector messageCollector, int i, File file, File file2, Config config, Clock clock) {
        this.taskModel = taskModel;
        this.restoreExecutor = executorService;
        this.clock = clock;
        this.storeChangelogs = map;
        this.systemAdmins = systemAdmins;
        this.loggedStoreBaseDirectory = file;
        this.nonLoggedStoreBaseDirectory = file2;
        this.streamMetadataCache = streamMetadataCache;
        this.storeConsumers = map5;
        this.maxChangeLogStreamPartitions = i;
        this.config = config;
        this.taskStores = createStoreEngines(set, jobContext, containerContext, map3, map4, metricsRegistry, messageCollector, map2);
        this.taskStoresToRestore = (Set) this.taskStores.entrySet().stream().filter(entry -> {
            return ((StorageEngine) entry.getValue()).getStoreProperties().isLoggedStore();
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public void init(Checkpoint checkpoint) {
        cleanBaseDirsAndReadOffsetFiles();
        setupBaseDirs();
        validateChangelogStreams();
        getOldestChangeLogOffsets();
        registerStartingOffsets();
    }

    private void cleanBaseDirsAndReadOffsetFiles() {
        LOG.debug("Cleaning base directories for stores.");
        StorageConfig storageConfig = new StorageConfig(this.config);
        FileUtil fileUtil = new FileUtil();
        this.taskStores.forEach((str, storageEngine) -> {
            if (!storageEngine.getStoreProperties().isLoggedStore()) {
                File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(this.nonLoggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode());
                LOG.info("Got non logged storage partition directory as " + taskStoreDir.toPath().toString());
                if (taskStoreDir.exists()) {
                    LOG.info("Deleting non logged storage partition directory " + taskStoreDir.toPath().toString());
                    fileUtil.rm(taskStoreDir);
                    return;
                }
                return;
            }
            File taskStoreDir2 = this.storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode());
            LOG.info("Got logged storage partition directory as " + taskStoreDir2.toPath().toString());
            if (!isLoggedStoreValid(str, taskStoreDir2) || storageConfig.cleanLoggedStoreDirsOnStart(str)) {
                LOG.info("Deleting logged storage partition directory " + taskStoreDir2.toPath().toString());
                fileUtil.rm(taskStoreDir2);
                return;
            }
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(this.storeChangelogs.get(str), this.taskModel.getChangelogPartition());
            Map<SystemStreamPartition, String> readOffsetFile = this.storageManagerUtil.readOffsetFile(taskStoreDir2, Collections.singleton(systemStreamPartition), false);
            LOG.info("Read offset {} for the store {} from logged storage partition directory {}", new Object[]{readOffsetFile, str, taskStoreDir2});
            if (readOffsetFile.containsKey(systemStreamPartition)) {
                this.fileOffsets.put(systemStreamPartition, readOffsetFile.get(systemStreamPartition));
            }
        });
    }

    private boolean isLoggedStoreValid(String str, File file) {
        long changeLogDeleteRetentionInMs = new StorageConfig(this.config).getChangeLogDeleteRetentionInMs(str);
        if (this.storeChangelogs.containsKey(str)) {
            return this.taskStores.get(str).getStoreProperties().isPersistedToDisk() && this.storageManagerUtil.isOffsetFileValid(file, Collections.singleton(new SystemStreamPartition(this.storeChangelogs.get(str), this.taskModel.getChangelogPartition())), false) && !this.storageManagerUtil.isStaleStore(file, changeLogDeleteRetentionInMs, this.clock.currentTimeMillis(), false);
        }
        return false;
    }

    private void setupBaseDirs() {
        LOG.debug("Setting up base directories for stores.");
        this.taskStores.forEach((str, storageEngine) -> {
            if (!storageEngine.getStoreProperties().isLoggedStore()) {
                File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(this.nonLoggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode());
                LOG.info("Using non logged storage partition directory: " + taskStoreDir.toPath().toString() + " for store: " + str);
                taskStoreDir.mkdirs();
            } else {
                File taskStoreDir2 = this.storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode());
                LOG.info("Using logged storage partition directory: " + taskStoreDir2.toPath().toString() + " for store: " + str);
                if (taskStoreDir2.exists()) {
                    return;
                }
                taskStoreDir2.mkdirs();
            }
        });
    }

    private void validateChangelogStreams() {
        LOG.info("Validating change log streams: " + this.storeChangelogs);
        for (SystemStream systemStream : this.storeChangelogs.values()) {
            this.systemAdmins.getSystemAdmin(systemStream.getSystem()).validateStream(StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), this.maxChangeLogStreamPartitions));
        }
    }

    private void getOldestChangeLogOffsets() {
        Map<SystemStream, SystemStreamMetadata> map = (Map) JavaConverters.mapAsJavaMapConverter(this.streamMetadataCache.getStreamMetadata(((scala.collection.mutable.Set) JavaConverters.asScalaSetConverter(new HashSet(this.storeChangelogs.values())).asScala()).toSet(), false)).asJava();
        LOG.info("Got change log stream metadata: {}", map);
        this.changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(this.taskModel.getChangelogPartition(), map);
        LOG.info("Assigning oldest change log offsets for taskName {} : {}", this.taskModel.getTaskName(), this.changeLogOldestOffsets);
    }

    private Map<SystemStream, String> getChangeLogOldestOffsetsForPartition(Partition partition, Map<SystemStream, SystemStreamMetadata> map) {
        HashMap hashMap = new HashMap();
        map.entrySet().stream().filter(entry -> {
            return ((SystemStreamMetadata) entry.getValue()).getSystemStreamPartitionMetadata().get(partition) != null;
        }).forEach(entry2 -> {
        });
        return hashMap;
    }

    private void registerStartingOffsets() {
        for (Map.Entry<String, SystemStream> entry : this.storeChangelogs.entrySet()) {
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(entry.getValue(), this.taskModel.getChangelogPartition());
            SystemAdmin systemAdmin = this.systemAdmins.getSystemAdmin(entry.getValue().getSystem());
            SystemConsumer systemConsumer = this.storeConsumers.get(entry.getKey());
            String startingOffset = getStartingOffset(systemStreamPartition, systemAdmin);
            if (startingOffset != null) {
                LOG.info("Registering change log consumer with offset " + startingOffset + " for %" + systemStreamPartition);
                systemConsumer.register(systemStreamPartition, startingOffset);
            } else {
                LOG.info("Skipping change log restoration for {} because stream appears to be empty (offset was null).", systemStreamPartition);
                this.taskStoresToRestore.remove(entry.getKey());
            }
        }
    }

    private String getStartingOffset(SystemStreamPartition systemStreamPartition, SystemAdmin systemAdmin) {
        String str = this.fileOffsets.get(systemStreamPartition);
        if (!this.changeLogOldestOffsets.containsKey(systemStreamPartition.getSystemStream())) {
            throw new SamzaException("Missing a change log offset for " + systemStreamPartition);
        }
        return this.storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, str, this.changeLogOldestOffsets.get(systemStreamPartition.getSystemStream()));
    }

    private Map<String, StorageEngine> createStoreEngines(Set<String> set, JobContext jobContext, ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> map, Map<String, Serde<Object>> map2, MetricsRegistry metricsRegistry, MessageCollector messageCollector, Map<String, StorageEngine> map3) {
        HashMap hashMap = new HashMap();
        hashMap.getClass();
        map3.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        set.forEach(str -> {
            hashMap.put(str, ContainerStorageManager.createStore(str, this.storageManagerUtil.getTaskStoreDir(this.storeChangelogs.containsKey(str) ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode()), this.taskModel, jobContext, containerContext, map, map2, metricsRegistry, messageCollector, StorageEngineFactory.StoreMode.BulkLoad, this.storeChangelogs, this.config));
        });
        return hashMap;
    }

    public CompletableFuture<Void> restore() {
        return CompletableFuture.runAsync(() -> {
            for (String str : this.taskStoresToRestore) {
                LOG.info("Restoring store: {} for task: {}", str, this.taskModel.getTaskName());
                SystemConsumer systemConsumer = this.storeConsumers.get(str);
                SystemStream systemStream = this.storeChangelogs.get(str);
                try {
                    this.taskStores.get(str).restore(new ChangelogSSPIterator(systemConsumer, new SystemStreamPartition(systemStream, this.taskModel.getChangelogPartition()), (String) null, this.systemAdmins.getSystemAdmin(systemStream.getSystem()), false));
                } catch (InterruptedException e) {
                    throw new SamzaException(String.format("Interrupted while restoring store: %s for task: %s", str, this.taskModel.getTaskName().getTaskName()), e);
                }
            }
        }, this.restoreExecutor);
    }

    public void close() {
        Map map = (Map) this.taskStores.entrySet().stream().filter(entry -> {
            return ((StorageEngine) entry.getValue()).getStoreProperties().isPersistedToDisk();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        map.forEach((str, storageEngine) -> {
            storageEngine.stop();
            this.taskStores.remove(str);
        });
        LOG.info("Stopped persistent stores {}", map);
    }
}
