package org.apache.samza.coordinator.staticresource;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelHelper;
import org.apache.samza.coordinator.JobModelMonitors;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.util.DiagnosticsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.class */
public class StaticResourceJobCoordinator implements JobCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
    private final JobModelHelper jobModelHelper;
    private final JobInfoServingContext jobModelServingContext;
    private final CoordinatorCommunication coordinatorCommunication;
    private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
    private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
    private final StreamRegexMonitorFactory streamRegexMonitorFactory;
    private final Optional<StartpointManager> startpointManager;
    private final ChangelogStreamManager changelogStreamManager;
    private final JobRestartSignal jobRestartSignal;
    private final MetricsRegistry metrics;
    private final SystemAdmins systemAdmins;
    private final String processorId;
    private final Optional<String> executionEnvContainerId;
    private final Optional<String> samzaEpochId;
    private final Config config;
    private volatile Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
    private volatile Optional<DiagnosticsManager> currentDiagnosticsManager = Optional.empty();
    private volatile Optional<JobModel> currentJobModel = Optional.empty();
    private volatile Optional<JobModelMonitors> currentJobModelMonitors = Optional.empty();
    private final AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public StaticResourceJobCoordinator(String str, JobModelHelper jobModelHelper, JobInfoServingContext jobInfoServingContext, CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager, StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory, StreamRegexMonitorFactory streamRegexMonitorFactory, Optional<StartpointManager> optional, ChangelogStreamManager changelogStreamManager, JobRestartSignal jobRestartSignal, MetricsRegistry metricsRegistry, SystemAdmins systemAdmins, Optional<String> optional2, Optional<String> optional3, Config config) {
        this.jobModelHelper = jobModelHelper;
        this.jobModelServingContext = jobInfoServingContext;
        this.coordinatorCommunication = coordinatorCommunication;
        this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
        this.streamPartitionCountMonitorFactory = streamPartitionCountMonitorFactory;
        this.streamRegexMonitorFactory = streamRegexMonitorFactory;
        this.startpointManager = optional;
        this.changelogStreamManager = changelogStreamManager;
        this.jobRestartSignal = jobRestartSignal;
        this.metrics = metricsRegistry;
        this.systemAdmins = systemAdmins;
        this.processorId = str;
        this.executionEnvContainerId = optional2;
        this.samzaEpochId = optional3;
        this.config = config;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void start() {
        LOG.info("Starting job coordinator");
        this.systemAdmins.start();
        this.startpointManager.ifPresent((v0) -> {
            v0.start();
        });
        try {
            JobModel newJobModel = newJobModel();
            doSetLoggingContextConfig(newJobModel.getConfig());
            JobModelMonitors jobModelMonitors = jobModelMonitors(newJobModel);
            Optional<DiagnosticsManager> diagnosticsManager = diagnosticsManager(newJobModel);
            JobCoordinatorMetadata generateJobCoordinatorMetadata = this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(newJobModel, newJobModel.getConfig());
            Set<JobMetadataChange> checkForMetadataChanges = checkForMetadataChanges(generateJobCoordinatorMetadata);
            if (checkForMetadataChanges.isEmpty() || checkForMetadataChanges.contains(JobMetadataChange.NEW_DEPLOYMENT)) {
                prepareWorkerExecution(newJobModel, generateJobCoordinatorMetadata, checkForMetadataChanges);
                this.currentDiagnosticsManager = diagnosticsManager;
                this.currentJobModelMonitors = Optional.of(jobModelMonitors);
                this.currentJobModel = Optional.of(newJobModel);
                this.coordinatorCommunication.start();
                this.jobCoordinatorListener.ifPresent(jobCoordinatorListener -> {
                    jobCoordinatorListener.onNewJobModel(this.processorId, newJobModel);
                });
                this.currentDiagnosticsManager.ifPresent((v0) -> {
                    v0.start();
                });
                jobModelMonitors.start();
                this.jobPreparationComplete.set(true);
            } else {
                LOG.info("Triggering job restart");
                this.jobRestartSignal.restartJob();
            }
        } catch (Exception e) {
            LOG.error("Error while running job coordinator; exiting", e);
            throw new SamzaException("Error while running job coordinator", e);
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void stop() {
        try {
            this.jobCoordinatorListener.ifPresent((v0) -> {
                v0.onJobModelExpired();
            });
            if (this.jobPreparationComplete.get()) {
                this.currentDiagnosticsManager.ifPresent(StaticResourceJobCoordinator::quietlyStop);
                this.currentJobModelMonitors.ifPresent((v0) -> {
                    v0.stop();
                });
                this.coordinatorCommunication.stop();
            }
            this.startpointManager.ifPresent((v0) -> {
                v0.stop();
            });
            this.systemAdmins.stop();
        } finally {
            this.jobCoordinatorListener.ifPresent((v0) -> {
                v0.onCoordinatorStop();
            });
        }
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public String getProcessorId() {
        return this.processorId;
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public void setListener(JobCoordinatorListener jobCoordinatorListener) {
        this.jobCoordinatorListener = Optional.ofNullable(jobCoordinatorListener);
    }

    @Override // org.apache.samza.coordinator.JobCoordinator
    public JobModel getJobModel() {
        return this.currentJobModel.orElse(null);
    }

    private JobModel newJobModel() {
        return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
    }

    private JobModelMonitors jobModelMonitors(JobModel jobModel) {
        return new JobModelMonitors(this.streamPartitionCountMonitorFactory.build(jobModel.getConfig(), set -> {
            this.jobRestartSignal.restartJob();
        }), this.streamRegexMonitorFactory.build(jobModel, jobModel.getConfig(), (set2, set3, map) -> {
            this.jobRestartSignal.restartJob();
        }).orElse(null));
    }

    @VisibleForTesting
    void doSetLoggingContextConfig(Config config) {
        LoggingContextHolder.INSTANCE.setConfig(config);
    }

    private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
        JobConfig jobConfig = new JobConfig(this.config);
        return buildDiagnosticsManager(jobConfig.getName().orElseThrow(() -> {
            return new ConfigException("Missing job name");
        }), jobConfig.getJobId(), jobModel, CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, this.executionEnvContainerId, this.samzaEpochId, this.config);
    }

    private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata jobCoordinatorMetadata, Set<JobMetadataChange> set) throws IOException {
        if (!set.isEmpty()) {
            this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(jobCoordinatorMetadata);
        }
        this.jobModelServingContext.setJobModel(jobModel);
        metadataResourceUtil(jobModel).createResources();
        if (!this.startpointManager.isPresent() || set.isEmpty()) {
            return;
        }
        this.startpointManager.get().fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
    }

    @VisibleForTesting
    MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
        return new MetadataResourceUtil(jobModel, this.metrics, this.config);
    }

    @VisibleForTesting
    Optional<DiagnosticsManager> buildDiagnosticsManager(String str, String str2, JobModel jobModel, String str3, Optional<String> optional, Optional<String> optional2, Config config) {
        return DiagnosticsUtil.buildDiagnosticsManager(str, str2, jobModel, str3, optional, optional2, config);
    }

    private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata jobCoordinatorMetadata) {
        return this.jobCoordinatorMetadataManager.checkForMetadataChanges(jobCoordinatorMetadata, this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata());
    }

    private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
        try {
            diagnosticsManager.stop();
        } catch (InterruptedException e) {
            LOG.error("Exception while stopping diagnostics manager", e);
        }
    }
}
