package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.RunLoop;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemConsumersMetrics;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.chooser.DefaultChooser;
import org.apache.samza.system.chooser.RoundRobinChooserFactory;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/samza/storage/ContainerStorageManager.class */
public class ContainerStorageManager {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
    private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
    private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread";
    private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
    private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10;
    private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60;
    private static final int RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS = 60;
    private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1;
    private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
    private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
    private final Map<TaskName, Map<String, StorageEngine>> inMemoryStores;
    private Map<TaskName, Map<String, StorageEngine>> taskStores;
    private final Map<String, SystemConsumer> storeConsumers;
    private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories;
    private final Map<String, SystemStream> changelogSystemStreams;
    private final Map<String, Serde<Object>> serdes;
    private final SystemAdmins systemAdmins;
    private final Clock clock;
    private final Map<String, StateBackendFactory> restoreStateBackendFactories;
    private final StreamMetadataCache streamMetadataCache;
    private final SamzaContainerMetrics samzaContainerMetrics;
    private final CheckpointManager checkpointManager;
    private final ContainerModel containerModel;
    private final JobContext jobContext;
    private final ContainerContext containerContext;
    private final File loggedStoreBaseDirectory;
    private final File nonLoggedStoreBaseDirectory;
    private final Set<Path> storeDirectoryPaths;
    private final boolean hasSideInputs;
    private final Map<TaskName, Map<String, StorageEngine>> sideInputStores;
    private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
    private final Set<String> sideInputStoreNames;
    private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
    private SystemConsumers sideInputSystemConsumers;
    private RunLoop sideInputRunLoop;
    private final ExecutorService restoreExecutor;
    private final Config config;
    private volatile boolean shouldShutdown = false;
    private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_THREAD_NAME).build());
    private volatile Throwable sideInputException = null;
    private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
    private boolean isStarted = false;
    private final Map<TaskName, CountDownLatch> sideInputTaskLatches = new HashMap();

    public ContainerStorageManager(CheckpointManager checkpointManager, ContainerModel containerModel, StreamMetadataCache streamMetadataCache, SystemAdmins systemAdmins, Map<String, SystemStream> map, Map<String, Set<SystemStream>> map2, Map<String, StorageEngineFactory<Object, Object>> map3, Map<String, SystemFactory> map4, Map<String, Serde<Object>> map5, Config config, Map<TaskName, TaskInstanceMetrics> map6, SamzaContainerMetrics samzaContainerMetrics, JobContext jobContext, ContainerContext containerContext, Map<String, StateBackendFactory> map7, Map<TaskName, TaskInstanceCollector> map8, File file, File file2, SerdeManager serdeManager, Clock clock) {
        this.checkpointManager = checkpointManager;
        this.containerModel = containerModel;
        this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, map2, map);
        this.sideInputStoreNames = getSideInputStores(containerModel, map2, map);
        this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream().flatMap(map9 -> {
            return map9.values().stream();
        }).flatMap((v0) -> {
            return v0.stream();
        }).findAny().isPresent();
        this.changelogSystemStreams = getActiveTaskChangelogSystemStreams(containerModel, map);
        LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs = {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
        this.clock = clock;
        this.restoreStateBackendFactories = map7;
        this.storageEngineFactories = map3;
        this.serdes = map5;
        this.loggedStoreBaseDirectory = file;
        this.nonLoggedStoreBaseDirectory = file2;
        if (file != null && file.equals(file2)) {
            LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configurethem separately to ensure clean up of non-logged store data doesn't accidentally impact logged store data.", file);
        }
        this.config = config;
        this.taskInstanceMetrics = map6;
        this.samzaContainerMetrics = samzaContainerMetrics;
        this.jobContext = jobContext;
        this.containerContext = containerContext;
        this.taskInstanceCollectors = map8;
        this.storeDirectoryPaths = new HashSet();
        this.streamMetadataCache = streamMetadataCache;
        this.systemAdmins = systemAdmins;
        this.sideInputStores = createTaskStores(this.sideInputStoreNames, containerModel, jobContext, containerContext, map3, map5, map6, map8);
        StorageConfig storageConfig = new StorageConfig(config);
        this.inMemoryStores = createTaskStores((Set) map3.keySet().stream().filter(str -> {
            Optional<String> storageFactoryClassName = storageConfig.getStorageFactoryClassName(str);
            return storageFactoryClassName.isPresent() && storageFactoryClassName.get().equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
        }).collect(Collectors.toSet()), this.containerModel, jobContext, containerContext, map3, map5, map6, map8);
        this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, createConsumers((Set) this.changelogSystemStreams.values().stream().map((v0) -> {
            return v0.getSystem();
        }).collect(Collectors.toSet()), map4, config, this.samzaContainerMetrics.mo47registry()));
        JobConfig jobConfig = new JobConfig(config);
        this.restoreExecutor = Executors.newFixedThreadPool(Math.min(Math.max(containerModel.getTasks().size() * map7.size() * 2, jobConfig.getRestoreThreadPoolSize()), jobConfig.getRestoreThreadPoolMaxSize()), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
        this.sspSideInputHandlers = createSideInputHandlers(clock);
        if (this.hasSideInputs) {
            Set set = (Set) this.taskSideInputStoreSSPs.values().stream().flatMap(map10 -> {
                return map10.values().stream();
            }).flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getSystemStream();
            }).collect(Collectors.toSet());
            Map<String, SystemConsumer> createConsumers = createConsumers((Set) set.stream().map((v0) -> {
                return v0.getSystem();
            }).collect(Collectors.toSet()), map4, config, this.samzaContainerMetrics.mo47registry());
            scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> streamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(set).toSet(), false);
            SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.mo47registry(), SIDEINPUTS_METRICS_PREFIX);
            this.sideInputSystemConsumers = new SystemConsumers(DefaultChooser.apply(streamMetadata, new RoundRobinChooserFactory(), config, systemConsumersMetrics.mo47registry(), systemAdmins), ScalaJavaUtil.toScalaMap(createConsumers), systemAdmins, serdeManager, systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), 50, ScalaJavaUtil.toScalaFunction(() -> {
                return Long.valueOf(System.nanoTime());
            }), 1, new ApplicationConfig(config).getRunId());
        }
    }

    @VisibleForTesting
    Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel containerModel, Map<String, SystemStream> map) {
        if (MapUtils.invertMap(map).size() != map.size()) {
            throw new SamzaException("Two stores cannot have the same changelog system-stream");
        }
        HashMap hashMap = new HashMap();
        map.forEach((str, systemStream) -> {
            containerModel.getTasks().forEach((taskName, taskModel) -> {
            });
        });
        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
            map.forEach((str2, systemStream2) -> {
                hashMap.remove(new SystemStreamPartition(systemStream2, taskModel.getChangelogPartition()));
            });
        });
        return (Map) MapUtils.invertMap(hashMap).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((SystemStreamPartition) entry.getValue()).getSystemStream();
        }));
    }

    @VisibleForTesting
    Set<String> getSideInputStores(ContainerModel containerModel, Map<String, Set<SystemStream>> map, Map<String, SystemStream> map2) {
        HashSet hashSet = new HashSet(map.keySet());
        if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
            hashSet.addAll(map2.keySet());
        }
        return hashSet;
    }

    @VisibleForTesting
    Map<TaskName, Map<String, Set<SystemStreamPartition>>> getTaskSideInputSSPs(ContainerModel containerModel, Map<String, Set<SystemStream>> map, Map<String, SystemStream> map2) {
        HashMap hashMap = new HashMap();
        containerModel.getTasks().forEach((taskName, taskModel) -> {
            hashMap.putIfAbsent(taskName, new HashMap());
            map.keySet().forEach(str -> {
                ((Map) hashMap.get(taskName)).put(str, (Set) taskModel.getSystemStreamPartitions().stream().filter(systemStreamPartition -> {
                    return ((Set) map.get(str)).contains(systemStreamPartition.getSystemStream());
                }).collect(Collectors.toSet()));
            });
        });
        getTasks(containerModel, TaskMode.Standby).forEach((taskName2, taskModel2) -> {
            hashMap.putIfAbsent(taskName2, new HashMap());
            map2.forEach((str, systemStream) -> {
                ((Map) hashMap.get(taskName2)).put(str, Collections.singleton(new SystemStreamPartition(systemStream, taskModel2.getChangelogPartition())));
            });
        });
        return hashMap;
    }

    private static Map<String, SystemConsumer> createConsumers(Set<String> set, Map<String, SystemFactory> map, Config config, MetricsRegistry metricsRegistry) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            SystemFactory systemFactory = map.get(str);
            if (systemFactory == null) {
                throw new SamzaException("System " + str + " does not exist in config");
            }
            hashMap.put(str, systemFactory.getConsumer(str, config, metricsRegistry));
        }
        return hashMap;
    }

    private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, SystemStream> map, Map<String, SystemConsumer> map2) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            hashMap.put(str, map2.get(map.get(str).getSystem()));
        }
        return hashMap;
    }

    private Map<String, TaskRestoreManager> createTaskRestoreManagers(Map<String, StateBackendFactory> map, Map<String, Set<String>> map2, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName, TaskModel taskModel) {
        HashMap hashMap = new HashMap();
        ReadableMetricsRegistry mo47registry = this.taskInstanceMetrics.get(taskName) != null ? this.taskInstanceMetrics.get(taskName).mo47registry() : new MetricsRegistryMap();
        map2.forEach((str, set) -> {
            StateBackendFactory stateBackendFactory = (StateBackendFactory) map.get(str);
            if (stateBackendFactory == null) {
                throw new SamzaException(String.format("Required restore state backend factory: %s not found in configured factories %s", str, String.join(", ", map.keySet())));
            }
            hashMap.put(str, stateBackendFactory.getRestoreManager(this.jobContext, this.containerContext, taskModel, this.restoreExecutor, mo47registry, set, this.config, clock, this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, new KafkaChangelogRestoreParams(this.storeConsumers, this.inMemoryStores.get(taskName), this.systemAdmins.getSystemAdmins(), this.storageEngineFactories, this.serdes, this.taskInstanceCollectors.get(taskName))));
        });
        samzaContainerMetrics.addStoresRestorationGauge(taskName);
        return hashMap;
    }

    @VisibleForTesting
    Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, Set<String> set, StorageConfig storageConfig) {
        HashMap hashMap = new HashMap();
        if (checkpoint != null && checkpoint.getVersion() == 1) {
            Set set2 = (Set) set.stream().filter(str -> {
                return storageConfig.getChangelogStream(str).isPresent();
            }).collect(Collectors.toSet());
            if (!set2.isEmpty()) {
                hashMap.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, set2);
            }
            if (set.size() > set2.size()) {
                LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,restore for the store will be skipped", (Set) set.stream().filter(str2 -> {
                    return !set2.contains(str2);
                }).collect(Collectors.toSet()));
            }
        } else {
            if (checkpoint != null && checkpoint.getVersion() != 2) {
                throw new SamzaException(String.format("Unsupported checkpoint version %s", Short.valueOf(checkpoint.getVersion())));
            }
            Map emptyMap = checkpoint == null ? Collections.emptyMap() : ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
            set.forEach(str3 -> {
                String str3;
                List<String> storeRestoreFactories = storageConfig.getStoreRestoreFactories(str3);
                if (storeRestoreFactories.isEmpty()) {
                    LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,restore for the store will be skipped", str3);
                    return;
                }
                Optional<String> findFirst = storeRestoreFactories.stream().filter(str4 -> {
                    return emptyMap.containsKey(str4) && ((Map) emptyMap.get(str4)).containsKey(str3);
                }).findFirst();
                if (findFirst.isPresent()) {
                    str3 = findFirst.get();
                } else {
                    str3 = storeRestoreFactories.get(0);
                    LOG.warn("No matching checkpoints found for configured factories: {}, defaulting to using the first configured factory with no checkpoints", storeRestoreFactories);
                }
                if (!hashMap.containsKey(str3)) {
                    hashMap.put(str3, new HashSet());
                }
                ((Set) hashMap.get(str3)).add(str3);
            });
        }
        return hashMap;
    }

    private static Map<TaskName, TaskModel> getTasks(ContainerModel containerModel, TaskMode taskMode) {
        return (Map) containerModel.getTasks().entrySet().stream().filter(entry -> {
            return ((TaskModel) entry.getValue()).getTaskMode().equals(taskMode);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> set, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> map, Map<String, Serde<Object>> map2, Map<TaskName, TaskInstanceMetrics> map3, Map<TaskName, TaskInstanceCollector> map4) {
        HashMap hashMap = new HashMap();
        StorageConfig storageConfig = new StorageConfig(this.config);
        for (Map.Entry entry : containerModel.getTasks().entrySet()) {
            TaskName taskName = (TaskName) entry.getKey();
            TaskModel taskModel = (TaskModel) entry.getValue();
            if (!hashMap.containsKey(taskName)) {
                hashMap.put(taskName, new HashMap());
            }
            for (String str : set) {
                File taskStoreDir = this.storageManagerUtil.getTaskStoreDir(((this.changelogSystemStreams.containsKey(str) || !storageConfig.getStoreBackupFactories(str).isEmpty()) || this.sideInputStoreNames.contains(str)) ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory, str, taskName, taskModel.getTaskMode());
                this.storeDirectoryPaths.add(taskStoreDir.toPath());
                ((Map) hashMap.get(taskName)).put(str, createStore(str, taskStoreDir, taskModel, jobContext, containerContext, map, map2, map3.get(taskName) != null ? map3.get(taskName).mo47registry() : new MetricsRegistryMap(), map4.get(taskName), StorageEngineFactory.StoreMode.ReadWrite, this.changelogSystemStreams, this.config));
                LOG.info("Created task store {} in read-write mode for task {} in path {}", new Object[]{str, taskName, taskStoreDir.getAbsolutePath()});
            }
        }
        return hashMap;
    }

    public static StorageEngine createStore(String str, File file, TaskModel taskModel, JobContext jobContext, ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> map, Map<String, Serde<Object>> map2, MetricsRegistry metricsRegistry, MessageCollector messageCollector, StorageEngineFactory.StoreMode storeMode, Map<String, SystemStream> map3, Config config) {
        StorageConfig storageConfig = new StorageConfig(config);
        SystemStreamPartition systemStreamPartition = map3.containsKey(str) ? new SystemStreamPartition(map3.get(str), taskModel.getChangelogPartition()) : null;
        Optional<String> storageKeySerde = storageConfig.getStorageKeySerde(str);
        Serde<Object> serde = null;
        if (storageKeySerde.isPresent()) {
            serde = map2.get(storageKeySerde.get());
        }
        Optional<String> storageMsgSerde = storageConfig.getStorageMsgSerde(str);
        Serde<Object> serde2 = null;
        if (storageMsgSerde.isPresent()) {
            serde2 = map2.get(storageMsgSerde.get());
        }
        return map.get(str).getStorageEngine(str, file, serde, serde2, messageCollector, metricsRegistry, systemStreamPartition, jobContext, containerContext, storeMode);
    }

    private Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessors(StorageConfig storageConfig, ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> map) {
        HashMap hashMap = new HashMap();
        containerModel.getTasks().forEach((taskName, taskModel) -> {
            SideInputsProcessor sideInputsProcessor;
            hashMap.put(taskName, new HashMap());
            taskModel.getTaskMode();
            for (String str : this.taskSideInputStoreSSPs.get(taskName).keySet()) {
                Optional<String> sideInputsProcessorSerializedInstance = storageConfig.getSideInputsProcessorSerializedInstance(str);
                if (sideInputsProcessorSerializedInstance.isPresent()) {
                    sideInputsProcessor = (SideInputsProcessor) SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
                    LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", str, taskName);
                } else if (storageConfig.getSideInputsProcessorFactory(str).isPresent()) {
                    sideInputsProcessor = ((SideInputsProcessorFactory) ReflectionUtil.getObj(storageConfig.getSideInputsProcessorFactory(str).get(), SideInputsProcessorFactory.class)).getSideInputsProcessor(storageConfig, ((TaskInstanceMetrics) map.get(taskName)).mo47registry());
                    LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", new Object[]{storageConfig.getSideInputsProcessorFactory(str).get(), str, taskName});
                } else {
                    final Serde<Object> serde = this.serdes.get(storageConfig.getStorageKeySerde(str).orElseThrow(() -> {
                        return new SamzaException("Could not find storage key serde for store: " + str);
                    }));
                    final Serde<Object> serde2 = this.serdes.get(storageConfig.getStorageMsgSerde(str).orElseThrow(() -> {
                        return new SamzaException("Could not find storage msg serde for store: " + str);
                    }));
                    sideInputsProcessor = new SideInputsProcessor() { // from class: org.apache.samza.storage.ContainerStorageManager.1
                        public Collection<Entry<?, ?>> process(IncomingMessageEnvelope incomingMessageEnvelope, KeyValueStore keyValueStore) {
                            if (incomingMessageEnvelope.getKey() == null) {
                                return ImmutableList.of();
                            }
                            return ImmutableList.of(new Entry(serde.fromBytes((byte[]) incomingMessageEnvelope.getKey()), incomingMessageEnvelope.getMessage() == null ? null : serde2.fromBytes((byte[]) incomingMessageEnvelope.getMessage())));
                        }
                    };
                    LOG.info("Using identity side-inputs-processor for store: {}, task: {}", str, taskName);
                }
                ((Map) hashMap.get(taskName)).put(str, sideInputsProcessor);
            }
        });
        return hashMap;
    }

    private Map<SystemStreamPartition, TaskSideInputHandler> createSideInputHandlers(Clock clock) {
        Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessors = createSideInputProcessors(new StorageConfig(this.config), this.containerModel, this.taskInstanceMetrics);
        HashMap hashMap = new HashMap();
        if (this.hasSideInputs) {
            this.containerModel.getTasks().forEach((taskName, taskModel) -> {
                Map<String, StorageEngine> map = this.sideInputStores.get(taskName);
                HashMap hashMap2 = new HashMap();
                boolean z = false;
                for (String str : map.keySet()) {
                    Set<SystemStreamPartition> set = this.taskSideInputStoreSSPs.get(taskName).get(str);
                    z = z || !set.isEmpty();
                    hashMap2.put(str, set);
                }
                if (z) {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    this.sideInputTaskLatches.put(taskName, countDownLatch);
                    TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName, taskModel.getTaskMode(), this.loggedStoreBaseDirectory, map, hashMap2, (Map) createSideInputProcessors.get(taskName), this.systemAdmins, this.streamMetadataCache, countDownLatch, clock);
                    hashMap2.values().stream().flatMap((v0) -> {
                        return v0.stream();
                    }).forEach(systemStreamPartition -> {
                        hashMap.put(systemStreamPartition, taskSideInputHandler);
                    });
                    LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}", new Object[]{taskName, map, this.loggedStoreBaseDirectory});
                }
            });
        }
        return hashMap;
    }

    private Set<TaskSideInputHandler> getSideInputHandlers() {
        return (Set) this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
    }

    public void start() throws SamzaException, InterruptedException {
        restoreStores();
        try {
            this.restoreExecutor.shutdown();
            if (this.restoreExecutor.awaitTermination(60L, TimeUnit.MILLISECONDS)) {
                this.restoreExecutor.shutdownNow();
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
        if (this.hasSideInputs) {
            startSideInputs();
        }
        this.isStarted = true;
    }

    private void restoreStores() throws InterruptedException {
        LOG.info("Store Restore started");
        Set<TaskName> keySet = getTasks(this.containerModel, TaskMode.Active).keySet();
        Set<String> set = (Set) this.storageEngineFactories.keySet().stream().filter(str -> {
            return !this.sideInputStoreNames.contains(str);
        }).collect(Collectors.toSet());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        this.containerModel.getTasks().forEach((taskName, taskModel) -> {
            Checkpoint checkpoint = null;
            if (this.checkpointManager != null && keySet.contains(taskName)) {
                checkpoint = this.checkpointManager.readLastCheckpoint(taskName);
                LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", checkpoint, taskName);
            }
            hashMap2.put(taskName, checkpoint);
            hashMap.put(taskName, createTaskRestoreManagers(this.restoreStateBackendFactories, getBackendFactoryStoreNames(checkpoint, set, new StorageConfig(this.config)), this.clock, this.samzaContainerMetrics, taskName, taskModel));
        });
        hashMap.forEach((taskName2, map) -> {
            map.forEach((str2, taskRestoreManager) -> {
                taskRestoreManager.init((Checkpoint) hashMap2.get(taskName2));
            });
        });
        this.storeConsumers.values().stream().distinct().forEach((v0) -> {
            v0.start();
        });
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((taskName3, map2) -> {
            map2.forEach((str2, taskRestoreManager) -> {
                long currentTimeMillis = System.currentTimeMillis();
                String taskName3 = taskName3.getTaskName();
                LOG.info("Starting restore for state for task: {}", taskName3);
                arrayList.add(taskRestoreManager.restore().handle((r13, th) -> {
                    Gauge<Object> orDefault;
                    try {
                        taskRestoreManager.close();
                    } catch (Exception e) {
                        Logger logger = LOG;
                        Object[] objArr = new Object[3];
                        objArr[0] = taskName3;
                        objArr[1] = th != null ? "unsuccessful" : "successful";
                        objArr[2] = e;
                        logger.error("Error closing restore manager for task: {} after {} restore", objArr);
                    }
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (this.samzaContainerMetrics != null && (orDefault = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskName3, null)) != null) {
                        orDefault.set(Long.valueOf(currentTimeMillis2));
                    }
                    if (th == null) {
                        return null;
                    }
                    String format = String.format("Error restoring state for task: %s", taskName3);
                    LOG.error(format, th);
                    throw new SamzaException(format, th);
                }));
            });
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (InterruptedException e) {
                LOG.warn("Received an interrupt during store restoration. Interrupting the restore executor to exit prematurely without restoring full state.");
                this.restoreExecutor.shutdownNow();
                throw e;
            } catch (Exception e2) {
                LOG.error("Exception when restoring state.", e2);
                throw new SamzaException("Exception when restoring state.", e2);
            }
        }
        this.storeConsumers.values().stream().distinct().forEach((v0) -> {
            v0.stop();
        });
        this.taskStores = createTaskStores(set, this.containerModel, this.jobContext, this.containerContext, this.storageEngineFactories, this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors);
        this.inMemoryStores.forEach((taskName4, map3) -> {
            if (!this.taskStores.containsKey(taskName4)) {
                this.taskStores.put(taskName4, new HashMap());
            }
            this.taskStores.get(taskName4).putAll(map3);
        });
        this.sideInputStores.forEach((taskName5, map4) -> {
            if (!this.taskStores.containsKey(taskName5)) {
                this.taskStores.put(taskName5, new HashMap());
            }
            this.taskStores.get(taskName5).putAll(map4);
        });
        LOG.info("Store Restore complete");
    }

    private void startSideInputs() {
        LOG.info("SideInput Restore started");
        getSideInputHandlers().forEach((v0) -> {
            v0.init();
        });
        Map map = (Map) this.sspSideInputHandlers.values().stream().distinct().collect(Collectors.toMap((v0) -> {
            return v0.getTaskName();
        }, Function.identity()));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        this.taskSideInputStoreSSPs.forEach((taskName, map2) -> {
            Set set = (Set) this.taskSideInputStoreSSPs.get(taskName).values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
            if (set.isEmpty()) {
                return;
            }
            hashMap.put(taskName, new TaskInstanceMetrics(SIDEINPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source(), this.taskInstanceMetrics.get(taskName).mo47registry(), SIDEINPUTS_METRICS_PREFIX));
            hashMap2.put(taskName, new SideInputTask(taskName, set, (TaskSideInputHandler) map.get(taskName), (TaskInstanceMetrics) hashMap.get(taskName)));
        });
        for (SystemStreamPartition systemStreamPartition : this.sspSideInputHandlers.keySet()) {
            String startingOffset = this.sspSideInputHandlers.get(systemStreamPartition).getStartingOffset(systemStreamPartition);
            if (startingOffset == null) {
                throw new SamzaException("No starting offset could be obtained for SideInput SystemStreamPartition : " + systemStreamPartition + ". Consumer cannot start.");
            }
            this.sideInputSystemConsumers.register(systemStreamPartition, startingOffset);
            this.taskInstanceMetrics.get(this.sspSideInputHandlers.get(systemStreamPartition).getTaskName()).addOffsetGauge(systemStreamPartition, ScalaJavaUtil.toScalaFunction(() -> {
                return this.sspSideInputHandlers.get(systemStreamPartition).getLastProcessedOffset(systemStreamPartition);
            }));
            ((TaskInstanceMetrics) hashMap.get(this.sspSideInputHandlers.get(systemStreamPartition).getTaskName())).addOffsetGauge(systemStreamPartition, ScalaJavaUtil.toScalaFunction(() -> {
                return this.sspSideInputHandlers.get(systemStreamPartition).getLastProcessedOffset(systemStreamPartition);
            }));
        }
        this.sideInputSystemConsumers.start();
        TaskConfig taskConfig = new TaskConfig(this.config);
        this.sideInputRunLoop = new RunLoop(hashMap2, null, this.sideInputSystemConsumers, 1, -1L, taskConfig.getCommitMs(), taskConfig.getCallbackTimeoutMs(), taskConfig.getDrainCallbackTimeoutMs(), this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1L)), taskConfig.getMaxIdleMs(), new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(), this.samzaContainerMetrics.mo47registry(), SIDEINPUTS_METRICS_PREFIX), System::nanoTime, false, 1, new ApplicationConfig(this.config).getRunId(), ApplicationUtil.isHighLevelApiJob(this.config));
        try {
            this.sideInputsExecutor.submit(() -> {
                try {
                    this.sideInputRunLoop.run();
                } catch (Exception e) {
                    LOG.error("Exception in reading sideInputs", e);
                    this.sideInputException = e;
                }
            });
            while (!this.shouldShutdown && this.sideInputException == null && !awaitSideInputTasks()) {
                LOG.debug("Waiting for SideInput bootstrap to complete");
            }
            if (this.sideInputException != null) {
                throw new SamzaException("Exception in restoring sideInputs", this.sideInputException);
            }
            LOG.info("SideInput Restore complete");
        } catch (InterruptedException e) {
            LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
            this.shouldShutdown = true;
            throw new SamzaException("Side inputs read was interrupted", e);
        }
    }

    private boolean awaitSideInputTasks() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
        for (CountDownLatch countDownLatch : this.sideInputTaskLatches.values()) {
            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
            if (currentTimeMillis2 <= 0 || !countDownLatch.await(currentTimeMillis2, TimeUnit.MILLISECONDS)) {
                return false;
            }
        }
        return true;
    }

    public Optional<StorageEngine> getStore(TaskName taskName, String str) {
        if (this.isStarted) {
            return Optional.ofNullable(this.taskStores.get(taskName).get(str));
        }
        throw new SamzaException(String.format("Attempting to access store %s for task %s before ContainerStorageManager is started.", str, taskName));
    }

    public Map<String, StorageEngine> getAllStores(TaskName taskName) {
        if (this.isStarted) {
            return this.taskStores.get(taskName);
        }
        throw new SamzaException(String.format("Attempting to access stores for task %s before ContainerStorageManager is started.", taskName));
    }

    public Set<Path> getStoreDirectoryPaths() {
        return this.storeDirectoryPaths;
    }

    @VisibleForTesting
    public void stopStores() {
        this.taskStores.forEach((taskName, map) -> {
            map.forEach((str, storageEngine) -> {
                storageEngine.stop();
            });
        });
    }

    public void shutdown() {
        if (this.taskStores != null) {
            this.containerModel.getTasks().forEach((taskName, taskModel) -> {
                this.taskStores.get(taskName).entrySet().stream().filter(entry -> {
                    return !this.sideInputStoreNames.contains(entry.getKey());
                }).forEach(entry2 -> {
                    ((StorageEngine) entry2.getValue()).stop();
                });
            });
        }
        this.shouldShutdown = true;
        if (this.hasSideInputs) {
            this.sideInputRunLoop.shutdown();
            this.sideInputsExecutor.shutdown();
            try {
                this.sideInputsExecutor.awaitTermination(60L, TimeUnit.SECONDS);
                this.sideInputSystemConsumers.stop();
                getSideInputHandlers().forEach((v0) -> {
                    v0.stop();
                });
            } catch (InterruptedException e) {
                throw new SamzaException("Exception while shutting down sideInputs", e);
            }
        }
        LOG.info("Shutdown complete");
    }
}
