package org.apache.samza.coordinator.staticresource;

import java.util.Optional;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobModelCalculator;
import org.apache.samza.coordinator.JobModelHelper;
import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;

/* loaded from: input_file:org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.class */
public class StaticResourceJobCoordinatorFactory implements JobCoordinatorFactory {
    @Override // org.apache.samza.coordinator.JobCoordinatorFactory
    public JobCoordinator getJobCoordinator(String str, Config config, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
        JobInfoServingContext jobInfoServingContext = new JobInfoServingContext();
        JobConfig jobConfig = new JobConfig(config);
        CoordinatorCommunication coordinatorCommunication = new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(new CoordinatorCommunicationContext(jobInfoServingContext, config, metricsRegistry));
        JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.NON_YARN, metricsRegistry);
        ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
        JobRestartSignal build = ((JobRestartSignalFactory) ReflectionUtil.getObj(new JobCoordinatorConfig(config).getJobRestartSignalFactory(), JobRestartSignalFactory.class)).build(new JobRestartSignalFactoryContext(config));
        Optional of = jobConfig.getStartpointEnabled() ? Optional.of(new StartpointManager(metadataStore)) : Optional.empty();
        SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
        StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
        return new StaticResourceJobCoordinator(str, buildJobModelHelper(metadataStore, streamMetadataCache), jobInfoServingContext, coordinatorCommunication, jobCoordinatorMetadataManager, new StreamPartitionCountMonitorFactory(streamMetadataCache, metricsRegistry), new StreamRegexMonitorFactory(streamMetadataCache, metricsRegistry), of, changelogStreamManager, build, metricsRegistry, systemAdmins, Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)), Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID)), config);
    }

    private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore, StreamMetadataCache streamMetadataCache) {
        return new JobModelHelper(new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE)), new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE)), new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE)), streamMetadataCache, JobModelCalculator.INSTANCE);
    }
}
