package org.apache.samza.runtime;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.ContainerHeartbeatMonitor;
import org.apache.samza.container.ExecutionContainerIdManager;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainer$;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.drain.DrainMonitor;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import scala.Option;

/* loaded from: input_file:org/apache/samza/runtime/ContainerLaunchUtil.class */
public class ContainerLaunchUtil {
    private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
    private static volatile Throwable containerRunnerException = null;

    public static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, String str, JobModel jobModel) {
        Optional ofNullable = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
        Optional ofNullable2 = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID));
        JobConfig jobConfig = new JobConfig(jobModel.getConfig());
        run(applicationDescriptorImpl, jobConfig.getName().get(), jobConfig.getJobId(), str, ofNullable, ofNullable2, jobModel);
    }

    public static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, String str, String str2, String str3, Optional<String> optional, Optional<String> optional2, JobModel jobModel) {
        Config config = jobModel.getConfig();
        MDC.put("containerName", "samza-container-" + str3);
        MDC.put("jobName", str);
        MDC.put("jobId", str2);
        LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig());
        DiagnosticsUtil.writeMetadataFile(str, str2, str3, optional, config);
        run(applicationDescriptorImpl, str, str2, str3, optional, optional2, jobModel, config, buildExternalContext(config));
        exitProcess(0);
    }

    @VisibleForTesting
    static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, String str, String str2, String str3, Optional<String> optional, Optional<String> optional2, JobModel jobModel, Config config, Optional<ExternalContext> optional3) {
        CoordinatorStreamStore buildCoordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
        buildCoordinatorStreamStore.init();
        int i = 0;
        try {
            try {
                TaskFactory<?> taskFactory = TaskFactoryUtil.getTaskFactory(applicationDescriptorImpl);
                LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(buildCoordinatorStreamStore, SetContainerHostMapping.TYPE));
                StartpointManager startpointManager = null;
                if (new JobConfig(config).getStartpointEnabled()) {
                    startpointManager = new StartpointManager(buildCoordinatorStreamStore);
                }
                Map<String, MetricsReporter> loadMetricsReporters = loadMetricsReporters(applicationDescriptorImpl, str3, config);
                Optional<DiagnosticsManager> buildDiagnosticsManager = DiagnosticsUtil.buildDiagnosticsManager(str, str2, jobModel, str3, optional, optional2, config);
                MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
                DrainMonitor drainMonitor = null;
                if (new JobConfig(config).getDrainMonitorEnabled()) {
                    drainMonitor = new DrainMonitor(buildCoordinatorStreamStore, config);
                }
                SamzaContainer apply = SamzaContainer$.MODULE$.apply(str3, jobModel, ScalaJavaUtil.toScalaMap(loadMetricsReporters), metricsRegistryMap, taskFactory, JobContextImpl.fromConfigWithDefaults(config, jobModel), Option.apply(applicationDescriptorImpl.getApplicationContainerContextFactory().orElse(null)), Option.apply(applicationDescriptorImpl.getApplicationTaskContextFactory().orElse(null)), Option.apply(optional3.orElse(null)), localityManager, startpointManager, Option.apply(buildDiagnosticsManager.orElse(null)), drainMonitor);
                ProcessorLifecycleListener createInstance = applicationDescriptorImpl.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { // from class: org.apache.samza.runtime.ContainerLaunchUtil.1
                }, config);
                apply.getClass();
                ClusterBasedProcessorLifecycleListener clusterBasedProcessorLifecycleListener = new ClusterBasedProcessorLifecycleListener(config, createInstance, apply::shutdown);
                apply.setContainerListener(clusterBasedProcessorLifecycleListener);
                ContainerHeartbeatMonitor createContainerHeartbeatMonitor = createContainerHeartbeatMonitor(apply, new NamespaceAwareCoordinatorStreamStore(buildCoordinatorStreamStore, "set-config"), config);
                if (createContainerHeartbeatMonitor != null) {
                    createContainerHeartbeatMonitor.start();
                }
                if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
                    optional.ifPresent(str4 -> {
                        new ExecutionContainerIdManager(new NamespaceAwareCoordinatorStreamStore(buildCoordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE)).writeExecutionEnvironmentContainerIdMapping(str3, str4);
                    });
                }
                apply.run();
                if (createContainerHeartbeatMonitor != null) {
                    createContainerHeartbeatMonitor.stop();
                }
                if (containerRunnerException == null) {
                    containerRunnerException = clusterBasedProcessorLifecycleListener.getContainerException();
                }
                if (containerRunnerException != null) {
                    log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
                    i = 1;
                }
            } catch (Throwable th) {
                log.error("Exiting the process due to", th);
                log.error("Container runner exception: ", containerRunnerException);
                i = 1;
                buildCoordinatorStreamStore.close();
                if (1 != 0) {
                    exitProcess(1);
                }
            }
        } finally {
            buildCoordinatorStreamStore.close();
            if (i != 0) {
                exitProcess(i);
            }
        }
    }

    @VisibleForTesting
    static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, MetricsRegistryMap metricsRegistryMap) {
        return new CoordinatorStreamStore(config, metricsRegistryMap);
    }

    @VisibleForTesting
    static void exitProcess(int i) {
        System.exit(i);
    }

    private static Optional<ExternalContext> buildExternalContext(Config config) {
        return Optional.empty();
    }

    private static Map<String, MetricsReporter> loadMetricsReporters(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl, String str, Config config) {
        HashMap hashMap = new HashMap();
        applicationDescriptorImpl.getMetricsReporterFactories().forEach((str2, metricsReporterFactory) -> {
        });
        return hashMap;
    }

    private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer samzaContainer, MetadataStore metadataStore, Config config) {
        if (!new JobConfig(config).getContainerHeartbeatMonitorEnabled()) {
            log.info("Container heartbeat monitor is disabled");
            return null;
        }
        String str = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
        String str2 = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
        if (str2 != null) {
            log.info("Got execution environment container id for container heartbeat monitor: {}", str2);
            return new ContainerHeartbeatMonitor(() -> {
                try {
                    samzaContainer.shutdown();
                    containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
                } catch (Exception e) {
                    log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
                    System.exit(1);
                }
            }, str, str2, metadataStore, config);
        }
        log.warn("Container heartbeat monitor is enabled, but execution environment container id is not set. Container heartbeat monitor will not be created");
        return null;
    }
}
