package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.SamzaException;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.class */
public class JobCoordinatorLaunchUtil {
    private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
    private static final String JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER = "samza-job-coordinator";

    public static void run(SamzaApplication samzaApplication, Config config) {
        List<JobConfig> prepareJobs = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(samzaApplication, config)).prepareJobs();
        if (prepareJobs.size() != 1) {
            throw new SamzaException("Only support single remote job is supported.");
        }
        Config config2 = prepareJobs.get(0);
        CoordinatorStreamUtil.createCoordinatorStream(config2);
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config2), metricsRegistryMap);
        coordinatorStreamStore.init();
        MapConfig mapConfig = new MapConfig(new Map[]{CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(config2, coordinatorStreamStore), config2});
        CoordinatorStreamUtil.writeConfigToCoordinatorStream((Config) mapConfig, true);
        DiagnosticsUtil.createDiagnosticsStream(mapConfig);
        Optional<String> optionalJobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getOptionalJobCoordinatorFactoryClassName();
        if (optionalJobCoordinatorFactoryClassName.isPresent()) {
            runJobCoordinator(optionalJobCoordinatorFactoryClassName.get(), metricsRegistryMap, coordinatorStreamStore, mapConfig);
        } else {
            new ClusterBasedJobCoordinator(metricsRegistryMap, coordinatorStreamStore, mapConfig).run();
        }
    }

    private static void runJobCoordinator(String str, MetricsRegistryMap metricsRegistryMap, MetadataStore metadataStore, Config config) {
        JobCoordinator jobCoordinator = ((JobCoordinatorFactory) ReflectionUtil.getObj(str, JobCoordinatorFactory.class)).getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER, config, metricsRegistryMap, metadataStore);
        Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME);
        metricsReporters.values().forEach(metricsReporter -> {
            metricsReporter.register(CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, metricsRegistryMap);
        });
        metricsReporters.values().forEach((v0) -> {
            v0.start();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        jobCoordinator.setListener(new NoProcessorJobCoordinatorListener(countDownLatch));
        jobCoordinator.start();
        addShutdownHook(jobCoordinator);
        try {
            try {
                countDownLatch.await();
                metricsReporters.values().forEach((v0) -> {
                    v0.stop();
                });
            } catch (InterruptedException e) {
                LOG.error("Error while waiting for coordinator to complete", e);
                throw new SamzaException("Error while waiting for coordinator to complete", e);
            }
        } catch (Throwable th) {
            metricsReporters.values().forEach((v0) -> {
                v0.stop();
            });
            throw th;
        }
    }

    @VisibleForTesting
    static void addShutdownHook(JobCoordinator jobCoordinator) {
        Runtime runtime = Runtime.getRuntime();
        jobCoordinator.getClass();
        runtime.addShutdownHook(new Thread(jobCoordinator::stop, "Samza Job Coordinator Shutdown Hook Thread"));
    }

    private JobCoordinatorLaunchUtil() {
    }
}
