package org.apache.samza.storage;

import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.context.ContainerContextImpl;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/StorageRecovery.class */
public class StorageRecovery {
    private static final Logger LOG = LoggerFactory.getLogger(StorageRecovery.class);
    private final Config jobConfig;
    private final File storeBaseDir;
    private final SystemAdmins systemAdmins;
    private JobModel jobModel;
    private final Map<String, SystemStream> changeLogSystemStreams = new HashMap();
    private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap();
    private final Map<String, ContainerStorageManager> containerStorageManagers = new HashMap();
    private int maxPartitionNumber = 0;
    private Map<String, ContainerModel> containers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public StorageRecovery(Config config, String str) {
        this.jobConfig = config;
        this.storeBaseDir = new File(str, "state");
        this.systemAdmins = new SystemAdmins(config);
    }

    private void setup() {
        LOG.info("setting up the recovery...");
        getContainerModels();
        getChangeLogSystemStreamsAndStorageFactories();
        getChangeLogMaxPartitionNumber();
        getContainerStorageManagers();
    }

    public void run() {
        setup();
        LOG.info("start recovering...");
        this.systemAdmins.start();
        this.containerStorageManagers.forEach((str, containerStorageManager) -> {
            try {
                containerStorageManager.start();
            } catch (InterruptedException e) {
                LOG.warn("Received an interrupt during store restoration for container {}. Proceeding with the next container", str);
            }
        });
        this.containerStorageManagers.forEach((str2, containerStorageManager2) -> {
            containerStorageManager2.shutdown();
        });
        this.systemAdmins.stop();
        LOG.info("successfully recovered in " + this.storeBaseDir.toString());
    }

    private void getContainerModels() {
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(this.jobConfig, metricsRegistryMap);
        coordinatorStreamStore.init();
        try {
            JobModel jobModel = JobModelManager.apply(CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore), new ChangelogStreamManager(coordinatorStreamStore).readPartitionMapping(), coordinatorStreamStore, metricsRegistryMap).jobModel();
            this.jobModel = jobModel;
            this.containers = jobModel.getContainers();
            coordinatorStreamStore.close();
        } catch (Throwable th) {
            coordinatorStreamStore.close();
            throw th;
        }
    }

    private void getChangeLogSystemStreamsAndStorageFactories() {
        StorageConfig storageConfig = new StorageConfig(this.jobConfig);
        List<String> storeNames = storageConfig.getStoreNames();
        LOG.info("Got store names: " + storeNames.toString());
        for (String str : storeNames) {
            Optional<String> changelogStream = storageConfig.getChangelogStream(str);
            LOG.info("stream name for " + str + " is " + changelogStream.orElse(null));
            changelogStream.ifPresent(str2 -> {
                this.changeLogSystemStreams.put(str, StreamUtil.getSystemStreamFromNames(str2));
            });
            Optional<String> storageFactoryClassName = storageConfig.getStorageFactoryClassName(str);
            if (!storageFactoryClassName.isPresent()) {
                throw new SamzaException("Missing storage factory for " + str + ".");
            }
            this.storageEngineFactories.put(str, (StorageEngineFactory) ReflectionUtil.getObj(storageFactoryClassName.get(), StorageEngineFactory.class));
        }
    }

    private void getChangeLogMaxPartitionNumber() {
        int i = 0;
        Iterator<ContainerModel> it = this.containers.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getTasks().values().iterator();
            while (it2.hasNext()) {
                i = Math.max(i, ((TaskModel) it2.next()).getChangelogPartition().getPartitionId());
            }
        }
        this.maxPartitionNumber = i + 1;
    }

    private Map<String, Serde<Object>> getSerdes() {
        HashMap hashMap = new HashMap();
        SerializerConfig serializerConfig = new SerializerConfig(this.jobConfig);
        serializerConfig.getSerdeNames().forEach(str -> {
            hashMap.put(str, ((SerdeFactory) ReflectionUtil.getObj(serializerConfig.getSerdeFactoryClass(str).orElseGet(() -> {
                return SerializerConfig.getPredefinedSerdeFactoryName(str);
            }), SerdeFactory.class)).getSerde(str, serializerConfig));
        });
        return hashMap;
    }

    private void getContainerStorageManagers() {
        Map map = (Map) new StorageConfig(this.jobConfig).getRestoreFactories().stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return (StateBackendFactory) ReflectionUtil.getObj(str2, StateBackendFactory.class);
        }));
        StreamMetadataCache streamMetadataCache = new StreamMetadataCache(this.systemAdmins, 5000, SystemClock.instance());
        Map<String, SystemFactory> systemFactories = new SystemConfig(this.jobConfig).getSystemFactories();
        CheckpointManager orElse = new TaskConfig(this.jobConfig).getCheckpointManager(new MetricsRegistryMap()).orElse(null);
        for (ContainerModel containerModel : this.containers.values()) {
            this.containerStorageManagers.put(containerModel.getId(), new ContainerStorageManager(orElse, containerModel, streamMetadataCache, this.systemAdmins, this.changeLogSystemStreams, new HashMap(), this.storageEngineFactories, systemFactories, getSerdes(), this.jobConfig, new HashMap(), new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), DirIndex.ROOT_DIR_NAME), JobContextImpl.fromConfigWithDefaults(this.jobConfig, this.jobModel), new ContainerContextImpl(containerModel, new MetricsRegistryMap()), map, new HashMap(), this.storeBaseDir, this.storeBaseDir, null, SystemClock.instance()));
        }
    }
}
