package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.ExecutionContainerIdManager;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.InputStreamsDiscoveredException;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.PartitionChangeException;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.StreamRegexMonitor;
import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
import org.apache.samza.drain.DrainUtils;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.JmxServer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.storage.StateBackendAdmin;
import org.apache.samza.storage.StateBackendFactory;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/ClusterBasedJobCoordinator.class */
public class ClusterBasedJobCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
    private static final String METRICS_SOURCE_NAME = "ApplicationMaster";
    private final Config config;
    private final SamzaApplicationState state;
    private final ContainerProcessManager containerProcessManager;
    private final JobModelManager jobModelManager;
    private final ChangelogStreamManager changelogStreamManager;
    private final long jobCoordinatorSleepInterval;
    private final boolean isJmxEnabled;
    private final boolean hasDurableStores;
    private final StreamPartitionCountMonitor partitionMonitor;
    private final Optional<StreamRegexMonitor> inputStreamRegexMonitor;
    private final ContainerPlacementMetadataStore containerPlacementMetadataStore;
    private final ContainerPlacementRequestAllocator containerPlacementRequestAllocator;
    private final Thread containerPlacementRequestAllocatorThread;
    private final MetricsRegistryMap metrics;
    private final MetadataStore metadataStore;
    private final SystemAdmins systemAdmins;
    private final LocalityManager localityManager;
    private JmxServer jmxServer;
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private boolean metadataChangedAcrossAttempts = false;
    private volatile Exception coordinatorCallbackException = null;
    private volatile Throwable coordinatorRunException = null;

    public ClusterBasedJobCoordinator(MetricsRegistryMap metricsRegistryMap, MetadataStore metadataStore, Config config) {
        this.metrics = metricsRegistryMap;
        this.metadataStore = metadataStore;
        this.config = config;
        this.changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
        this.jobModelManager = JobModelManager.apply(this.config, this.changelogStreamManager.readPartitionMapping(), metadataStore, metricsRegistryMap);
        this.hasDurableStores = new StorageConfig(this.config).hasDurableStores();
        this.state = new SamzaApplicationState(this.jobModelManager);
        this.systemAdmins = new SystemAdmins(this.config, getClass().getSimpleName());
        this.partitionMonitor = getPartitionCountMonitor(this.config, this.systemAdmins);
        this.inputStreamRegexMonitor = getInputRegexMonitor(this.jobModelManager.jobModel(), this.config, this.systemAdmins);
        this.isJmxEnabled = new ClusterManagerConfig(this.config).getJmxEnabledOnJobCoordinator();
        this.jobCoordinatorSleepInterval = r0.getJobCoordinatorSleepInterval();
        this.localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
        if (isApplicationMasterHighAvailabilityEnabled()) {
            this.state.processorToExecutionId.putAll(new ExecutionContainerIdManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetExecutionEnvContainerIdMapping.TYPE)).readExecutionEnvironmentContainerIdMapping());
            generateAndUpdateJobCoordinatorMetadata(this.jobModelManager.jobModel());
        }
        this.containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
        this.containerProcessManager = createContainerProcessManager();
        this.containerPlacementRequestAllocator = new ContainerPlacementRequestAllocator(this.containerPlacementMetadataStore, this.containerProcessManager, new ApplicationConfig(this.config));
        this.containerPlacementRequestAllocatorThread = new Thread(this.containerPlacementRequestAllocator, "Samza-" + ContainerPlacementRequestAllocator.class.getSimpleName());
    }

    public void run() {
        if (!this.isStarted.compareAndSet(false, true)) {
            LOG.warn("Attempting to start an already started job coordinator. ");
            return;
        }
        if (this.isJmxEnabled) {
            this.jmxServer = new JmxServer();
            this.state.jmxUrl = this.jmxServer.getJmxUrl();
            this.state.jmxTunnelingUrl = this.jmxServer.getTunnelingJmxUrl();
        } else {
            this.jmxServer = null;
        }
        try {
            try {
                LOG.info("Starting cluster based job coordinator");
                DiagnosticsUtil.writeMetadataFile(new JobConfig(this.config).getName().get(), new JobConfig(this.config).getJobId(), METRICS_SOURCE_NAME, Optional.ofNullable(System.getenv("CONTAINER_ID")), this.config);
                JobModel jobModel = this.jobModelManager.jobModel();
                new MetadataResourceUtil(jobModel, this.metrics, this.config).createResources();
                new StorageConfig(this.config).getBackupFactories().forEach(str -> {
                    StateBackendAdmin admin = ((StateBackendFactory) ReflectionUtil.getObj(str, StateBackendFactory.class)).getAdmin(jobModel, this.config);
                    admin.createResources();
                    admin.validateResources();
                });
                if (shouldFanoutStartpoint()) {
                    StartpointManager createStartpointManager = createStartpointManager();
                    createStartpointManager.start();
                    try {
                        createStartpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
                        createStartpointManager.stop();
                    } catch (Throwable th) {
                        createStartpointManager.stop();
                        throw th;
                    }
                }
                Map<TaskName, Integer> readPartitionMapping = this.changelogStreamManager.readPartitionMapping();
                HashMap hashMap = new HashMap();
                Iterator it = jobModel.getContainers().values().iterator();
                while (it.hasNext()) {
                    for (TaskModel taskModel : ((ContainerModel) it.next()).getTasks().values()) {
                        hashMap.put(taskModel.getTaskName(), Integer.valueOf(taskModel.getChangelogPartition().getPartitionId()));
                    }
                }
                this.changelogStreamManager.updatePartitionMapping(readPartitionMapping, hashMap);
                this.containerProcessManager.start();
                this.systemAdmins.start();
                this.partitionMonitor.start();
                this.inputStreamRegexMonitor.ifPresent((v0) -> {
                    v0.start();
                });
                LOG.info("Starting the container placement handler thread");
                this.containerPlacementMetadataStore.start();
                this.containerPlacementRequestAllocatorThread.start();
                boolean z = false;
                while (!this.containerProcessManager.shouldShutdown() && !checkAndThrowException() && !z && checkcontainerPlacementRequestAllocatorThreadIsAlive()) {
                    try {
                        Thread.sleep(this.jobCoordinatorSleepInterval);
                    } catch (InterruptedException e) {
                        z = true;
                        LOG.error("Interrupted in job coordinator loop", e);
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (Throwable th2) {
                LOG.error("Exception thrown in the JobCoordinator loop", th2);
                this.coordinatorRunException = th2;
                throw new SamzaException(th2);
            }
        } finally {
            onShutDown();
        }
    }

    private boolean checkAndThrowException() throws Exception {
        if (this.coordinatorCallbackException != null) {
            throw this.coordinatorCallbackException;
        }
        return false;
    }

    private boolean checkcontainerPlacementRequestAllocatorThreadIsAlive() {
        if (this.containerPlacementRequestAllocatorThread.isAlive()) {
            return true;
        }
        LOG.info("{} thread is dead issuing a shutdown", this.containerPlacementRequestAllocatorThread.getName());
        return false;
    }

    @VisibleForTesting
    void generateAndUpdateJobCoordinatorMetadata(JobModel jobModel) {
        JobCoordinatorMetadataManager createJobCoordinatorMetadataManager = createJobCoordinatorMetadataManager();
        JobCoordinatorMetadata readJobCoordinatorMetadata = createJobCoordinatorMetadataManager.readJobCoordinatorMetadata();
        JobCoordinatorMetadata generateJobCoordinatorMetadata = createJobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, this.config);
        if (createJobCoordinatorMetadataManager.checkForMetadataChanges(generateJobCoordinatorMetadata, readJobCoordinatorMetadata).isEmpty()) {
            return;
        }
        createJobCoordinatorMetadataManager.writeJobCoordinatorMetadata(generateJobCoordinatorMetadata);
        this.metadataChangedAcrossAttempts = true;
    }

    private void onShutDown() {
        try {
            cleanupDrainNotifications();
            this.partitionMonitor.stop();
            this.inputStreamRegexMonitor.ifPresent((v0) -> {
                v0.stop();
            });
            this.systemAdmins.stop();
            shutDowncontainerPlacementRequestAllocatorAndUtils();
            this.containerProcessManager.stop();
            this.localityManager.close();
            this.metadataStore.close();
        } catch (Throwable th) {
            LOG.error("Exception while stopping cluster based job coordinator", th);
        }
        LOG.info("Stopped cluster based job coordinator");
        if (this.jmxServer != null) {
            try {
                this.jmxServer.stop();
                LOG.info("Stopped Jmx Server");
            } catch (Throwable th2) {
                LOG.error("Exception while stopping jmx server", th2);
            }
        }
    }

    private void cleanupDrainNotifications() {
        if (this.containerProcessManager.isShutdownSuccessful() && this.coordinatorRunException == null) {
            DrainUtils.cleanup(this.metadataStore, this.config);
        }
    }

    private void shutDowncontainerPlacementRequestAllocatorAndUtils() {
        this.containerPlacementRequestAllocator.stop();
        try {
            this.containerPlacementRequestAllocatorThread.join();
            LOG.info("Stopped container placement handler thread");
            this.containerPlacementMetadataStore.stop();
        } catch (InterruptedException e) {
            LOG.error("Container Placement handler thread join threw an interrupted exception", e);
            Thread.currentThread().interrupt();
        }
    }

    private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
        return new StreamPartitionCountMonitorFactory(new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()), this.metrics).build(config, set -> {
            if (this.hasDurableStores) {
                LOG.error("Input topic partition count changed in a job with durable state. Failing the job. Changed topics: {}", set.toString());
                this.state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
            }
            this.coordinatorCallbackException = new PartitionChangeException("Input topic partition count changes detected for topics: " + set.toString());
        });
    }

    private Optional<StreamRegexMonitor> getInputRegexMonitor(JobModel jobModel, Config config, SystemAdmins systemAdmins) {
        return new StreamRegexMonitorFactory(new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()), this.metrics).build(jobModel, config, (set, set2, map) -> {
            if (this.hasDurableStores) {
                LOG.error("New input system-streams discovered. Failing the job. New input streams: {} Existing input streams: {}", set2, set);
                this.state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
            }
            this.coordinatorCallbackException = new InputStreamsDiscoveredException("New input streams discovered: " + set2);
        });
    }

    @VisibleForTesting
    SamzaApplicationState.SamzaAppStatus getAppStatus() {
        return this.state.status;
    }

    @VisibleForTesting
    StreamPartitionCountMonitor getPartitionMonitor() {
        return this.partitionMonitor;
    }

    @VisibleForTesting
    StartpointManager createStartpointManager() {
        return new StartpointManager(this.metadataStore);
    }

    @VisibleForTesting
    ContainerProcessManager createContainerProcessManager() {
        return new ContainerProcessManager(this.config, this.state, this.metrics, this.containerPlacementMetadataStore, this.localityManager, this.metadataChangedAcrossAttempts);
    }

    @VisibleForTesting
    JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
        return new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(this.metadataStore, SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.YARN, this.metrics);
    }

    @VisibleForTesting
    boolean isApplicationMasterHighAvailabilityEnabled() {
        return new JobConfig(this.config).getApplicationMasterHighAvailabilityEnabled();
    }

    @VisibleForTesting
    boolean isMetadataChangedAcrossAttempts() {
        return this.metadataChangedAcrossAttempts;
    }

    @VisibleForTesting
    boolean shouldFanoutStartpoint() {
        boolean startpointEnabled = new JobConfig(this.config).getStartpointEnabled();
        return isApplicationMasterHighAvailabilityEnabled() ? startpointEnabled && isMetadataChangedAcrossAttempts() : startpointEnabled;
    }
}
