package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.ClusterResourceManager;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.LocalityModel;
import org.apache.samza.metrics.ContainerProcessManagerMetrics;
import org.apache.samza.metrics.JvmMetrics;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/ContainerProcessManager.class */
public class ContainerProcessManager implements ClusterResourceManager.Callback {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerProcessManager.class);
    private static final String METRICS_SOURCE_NAME = "ApplicationMaster";
    private static final String EXEC_ENV_CONTAINER_ID_SYS_PROPERTY = "CONTAINER_ID";
    private final boolean hostAffinityEnabled;
    private final SamzaApplicationState state;
    private final ClusterManagerConfig clusterManagerConfig;
    private final JobConfig jobConfig;
    private final ContainerAllocator containerAllocator;
    private final Thread allocatorThread;
    private final ContainerManager containerManager;
    private final Optional<DiagnosticsManager> diagnosticsManager;
    private final LocalityManager localityManager;
    private final ClusterResourceManager clusterResourceManager;
    private volatile boolean jobFailureCriteriaMet;
    private volatile Throwable exceptionOccurred;
    private final Map<String, ProcessorFailure> processorFailures;
    private final boolean restartContainers;
    private ContainerProcessManagerMetrics containerProcessManagerMetrics;
    private JvmMetrics jvmMetrics;
    private Map<String, MetricsReporter> metricsReporters;

    public ContainerProcessManager(Config config, SamzaApplicationState samzaApplicationState, MetricsRegistryMap metricsRegistryMap, ContainerPlacementMetadataStore containerPlacementMetadataStore, LocalityManager localityManager, boolean z) {
        this.jobFailureCriteriaMet = false;
        this.exceptionOccurred = null;
        this.processorFailures = new HashMap();
        Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
        this.state = samzaApplicationState;
        this.clusterManagerConfig = new ClusterManagerConfig(config);
        this.jobConfig = new JobConfig(config);
        this.hostAffinityEnabled = this.clusterManagerConfig.getHostAffinityEnabled();
        this.clusterResourceManager = (ClusterResourceManager) Preconditions.checkNotNull(getContainerProcessManagerFactory(this.clusterManagerConfig).getClusterResourceManager(this, samzaApplicationState));
        FaultDomainManager faultDomainManager = (FaultDomainManager) Preconditions.checkNotNull(getFaultDomainManagerFactory(this.clusterManagerConfig).getFaultDomainManager(config, metricsRegistryMap));
        this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, samzaApplicationState, metricsRegistryMap);
        this.jvmMetrics = new JvmMetrics(metricsRegistryMap);
        this.metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), METRICS_SOURCE_NAME);
        this.diagnosticsManager = DiagnosticsUtil.buildDiagnosticsManager(new JobConfig(config).getName().get(), new JobConfig(config).getJobId(), samzaApplicationState.jobModelManager.jobModel(), METRICS_SOURCE_NAME, Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY)), Optional.empty(), config);
        this.localityManager = localityManager;
        this.metricsReporters.values().forEach(metricsReporter -> {
            metricsReporter.register(METRICS_SOURCE_NAME, metricsRegistryMap);
        });
        this.containerManager = new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, this.clusterResourceManager, this.hostAffinityEnabled, this.jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config);
        this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, samzaApplicationState, this.hostAffinityEnabled, this.containerManager);
        this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
        this.restartContainers = z;
        LOG.info("Finished container process manager initialization.");
    }

    @VisibleForTesting
    ContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState samzaApplicationState, MetricsRegistryMap metricsRegistryMap, ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> optional, ContainerManager containerManager, LocalityManager localityManager, boolean z) {
        this.jobFailureCriteriaMet = false;
        this.exceptionOccurred = null;
        this.processorFailures = new HashMap();
        this.state = samzaApplicationState;
        this.clusterManagerConfig = clusterManagerConfig;
        this.jobConfig = new JobConfig(clusterManagerConfig);
        this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
        this.clusterResourceManager = clusterResourceManager;
        this.containerManager = containerManager;
        this.diagnosticsManager = Optional.empty();
        this.localityManager = localityManager;
        this.containerAllocator = optional.orElseGet(() -> {
            return new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, samzaApplicationState, this.hostAffinityEnabled, this.containerManager);
        });
        this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
        this.restartContainers = z;
        LOG.info("Finished container process manager initialization");
    }

    public boolean shouldShutdown() {
        Logger logger = LOG;
        Object[] objArr = new Object[4];
        objArr[0] = Integer.valueOf(this.state.completedProcessors.get());
        objArr[1] = this.state.processorCount;
        objArr[2] = this.jobFailureCriteriaMet ? "yes" : "no";
        objArr[3] = this.allocatorThread.isAlive() ? "yes" : "no";
        logger.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", objArr);
        if (this.exceptionOccurred == null) {
            return this.jobFailureCriteriaMet || this.state.completedProcessors.get() == this.state.processorCount.get() || !this.allocatorThread.isAlive();
        }
        LOG.error("Exception in container process manager", this.exceptionOccurred);
        throw new SamzaException(this.exceptionOccurred);
    }

    public boolean isShutdownSuccessful() {
        return this.state.status == SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
    }

    public void start() {
        LOG.info("Starting the container process manager");
        int containerRetryCount = this.clusterManagerConfig.getContainerRetryCount();
        if (containerRetryCount > -1) {
            LOG.info("Max retries on restarting failed containers: {}", Integer.valueOf(containerRetryCount));
        } else {
            LOG.info("Infinite retries on restarting failed containers");
        }
        if (this.jvmMetrics != null) {
            this.jvmMetrics.start();
        }
        if (this.metricsReporters != null) {
            this.metricsReporters.values().forEach(metricsReporter -> {
                metricsReporter.start();
            });
        }
        if (this.diagnosticsManager.isPresent()) {
            this.diagnosticsManager.get().start();
        }
        this.state.processorCount.set(this.state.jobModelManager.jobModel().getContainers().size());
        this.state.neededProcessors.set(this.state.jobModelManager.jobModel().getContainers().size());
        LOG.info("Starting the cluster resource manager");
        this.clusterResourceManager.start();
        LocalityModel readLocality = this.localityManager.readLocality();
        HashMap hashMap = new HashMap();
        this.state.jobModelManager.jobModel().getContainers().keySet().forEach(str -> {
            hashMap.put(str, (String) Optional.ofNullable(readLocality.getProcessorLocality(str)).map((v0) -> {
                return v0.host();
            }).filter((v0) -> {
                return StringUtils.isNotBlank(v0);
            }).orElse(null));
        });
        if (this.jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
            this.state.runningProcessors.forEach((str2, samzaResource) -> {
                LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}", str2, samzaResource.getContainerId());
                hashMap.remove(str2);
                if (this.restartContainers) {
                    this.clusterResourceManager.stopStreamProcessor(samzaResource);
                }
            });
        }
        this.containerAllocator.requestResources(hashMap);
        LOG.info("Starting the container allocator thread");
        this.allocatorThread.start();
        LOG.info("Starting the container process manager");
    }

    public void stop() {
        LOG.info("Stopping the container process manager");
        this.containerAllocator.stop();
        try {
            this.allocatorThread.join();
            LOG.info("Stopped container allocator");
        } catch (InterruptedException e) {
            LOG.error("Allocator thread join threw an interrupted exception", e);
            Thread.currentThread().interrupt();
        }
        if (this.diagnosticsManager.isPresent()) {
            try {
                this.diagnosticsManager.get().stop();
            } catch (InterruptedException e2) {
                LOG.error("InterruptedException while stopping diagnosticsManager", e2);
            }
        }
        try {
            if (this.metricsReporters != null) {
                this.metricsReporters.values().forEach(metricsReporter -> {
                    metricsReporter.stop();
                });
            }
            if (this.jvmMetrics != null) {
                this.jvmMetrics.stop();
            }
            LOG.info("Stopped containerProcessManagerMetrics reporters");
        } catch (Throwable th) {
            LOG.error("Exception while stopping containerProcessManagerMetrics", th);
        }
        try {
            this.clusterResourceManager.stop(this.state.status);
            LOG.info("Stopped the cluster resource manager");
        } catch (Throwable th2) {
            LOG.error("Exception while stopping cluster resource manager", th2);
        }
        LOG.info("Stopped the container process manager");
    }

    public void onResourceAllocated(SamzaResource samzaResource) {
        LOG.info("Container ID: {} allocated from RM on host: {}", samzaResource.getContainerId(), samzaResource.getHost());
        this.containerAllocator.addResource(samzaResource);
    }

    public void onResourceCompleted(SamzaResourceStatus samzaResourceStatus) {
        String containerId = samzaResourceStatus.getContainerId();
        Pair<String, String> runningProcessor = getRunningProcessor(containerId);
        String str = (String) runningProcessor.getKey();
        String str2 = (String) runningProcessor.getValue();
        if (str == null) {
            LOG.info("No running Processor ID found for Container ID: {} with Status: {}. Ignoring redundant notification.", containerId, samzaResourceStatus.toString());
            this.state.redundantNotifications.incrementAndGet();
            if (samzaResourceStatus.getExitCode() != 0) {
                this.containerAllocator.releaseResource(containerId);
                return;
            }
            return;
        }
        this.state.runningProcessors.remove(str);
        int exitCode = samzaResourceStatus.getExitCode();
        switch (exitCode) {
            case SamzaResourceStatus.PREEMPTED /* -102 */:
            case SamzaResourceStatus.DISK_FAIL /* -101 */:
            case SamzaResourceStatus.ABORTED /* -100 */:
                LOG.info("Container ID: {} for Processor ID: {} was released with an exit code: {}. This means that the container was killed by YARN, either due to being released by the application master or being 'lost' due to node failures etc. or due to preemption by the RM.Requesting a new container for the processor.", new Object[]{containerId, str, Integer.valueOf(exitCode)});
                this.state.releasedContainers.incrementAndGet();
                this.state.neededProcessors.incrementAndGet();
                this.state.jobHealthy.set(false);
                handleContainerStop(str, samzaResourceStatus.getContainerId(), ResourceRequestState.ANY_HOST, exitCode, Duration.ZERO);
                break;
            case 0:
                LOG.info("Container ID: {} for Processor ID: {} completed successfully.", containerId, str);
                this.state.completedProcessors.incrementAndGet();
                this.state.finishedProcessors.incrementAndGet();
                this.processorFailures.remove(str);
                if (this.state.completedProcessors.get() == this.state.processorCount.get()) {
                    LOG.info("Setting job status to SUCCEEDED since all containers have been marked as completed.");
                    this.state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
                    break;
                }
                break;
            default:
                onResourceCompletedWithUnknownStatus(samzaResourceStatus, containerId, str, exitCode);
                break;
        }
        if (this.diagnosticsManager.isPresent()) {
            this.diagnosticsManager.get().addProcessorStopEvent(str, samzaResourceStatus.getContainerId(), str2, exitCode);
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onResourcesAvailable(List<SamzaResource> list) {
        Iterator<SamzaResource> it = list.iterator();
        while (it.hasNext()) {
            onResourceAllocated(it.next());
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onResourcesCompleted(List<SamzaResourceStatus> list) {
        Iterator<SamzaResourceStatus> it = list.iterator();
        while (it.hasNext()) {
            onResourceCompleted(it.next());
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onStreamProcessorLaunchSuccess(SamzaResource samzaResource) {
        String containerId = samzaResource.getContainerId();
        String host = samzaResource.getHost();
        String pendingProcessorId = getPendingProcessorId(containerId);
        LOG.info("Successfully started Processor ID: {} on Container ID: {} on host: {}", new Object[]{pendingProcessorId, containerId, host});
        if (pendingProcessorId == null) {
            LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. Ignoring invalid/redundant notification.", containerId, host);
            return;
        }
        LOG.info("Moving Processor ID: {} on Container ID: {} on host: {} from pending to running state.", new Object[]{pendingProcessorId, containerId, host});
        this.state.pendingProcessors.remove(pendingProcessorId);
        this.state.runningProcessors.put(pendingProcessorId, samzaResource);
        if (this.state.neededProcessors.decrementAndGet() == 0) {
            this.state.jobHealthy.set(true);
        }
        this.containerManager.handleContainerLaunchSuccess(pendingProcessorId, host);
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onStreamProcessorLaunchFailure(SamzaResource samzaResource, Throwable th) {
        String containerId = samzaResource.getContainerId();
        String host = samzaResource.getHost();
        String pendingProcessorId = getPendingProcessorId(containerId);
        LOG.error("Launch failed for pending Processor ID: {} on Container ID: {} on host: {} with exception: {}", new Object[]{pendingProcessorId, containerId, host, th});
        LOG.info("Releasing un-startable Container ID: {} for pending Processor ID: {}", containerId, pendingProcessorId);
        this.clusterResourceManager.releaseResources(samzaResource);
        this.containerManager.handleContainerLaunchFail(pendingProcessorId, containerId, host, this.containerAllocator);
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onStreamProcessorStopFailure(SamzaResource samzaResource, Throwable th) {
        String containerId = samzaResource.getContainerId();
        String host = samzaResource.getHost();
        String str = (String) getRunningProcessor(containerId).getKey();
        LOG.warn("Stop failed for running Processor ID: {} on Container ID: {} on host: {} with exception: {}", new Object[]{str, containerId, host, th});
        this.containerManager.handleContainerStopFail(str, containerId, host, this.containerAllocator);
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onError(Throwable th) {
        LOG.error("Exception occurred in callbacks in the Cluster Resource Manager", th);
        this.exceptionOccurred = th;
    }

    @VisibleForTesting
    boolean getJobFailureCriteriaMet() {
        return this.jobFailureCriteriaMet;
    }

    @VisibleForTesting
    Map<String, ProcessorFailure> getProcessorFailures() {
        return this.processorFailures;
    }

    @VisibleForTesting
    void onResourceCompletedWithUnknownStatus(SamzaResourceStatus samzaResourceStatus, String str, String str2, int i) {
        int i2;
        long j;
        LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", new Object[]{str, str2, Integer.valueOf(i)});
        Instant now = Instant.now();
        this.state.failedContainers.incrementAndGet();
        this.state.jobHealthy.set(false);
        this.state.neededProcessors.incrementAndGet();
        String str3 = (String) Optional.ofNullable(this.localityManager.readLocality().getProcessorLocality(str2)).map((v0) -> {
            return v0.host();
        }).orElse(null);
        if (!this.hostAffinityEnabled || StringUtils.isBlank(str3)) {
            str3 = ResourceRequestState.ANY_HOST;
        }
        LOG.info("Container ID: {} for Processor ID: {} was last seen on host {}.", new Object[]{str, str2, str3});
        int containerRetryCount = this.clusterManagerConfig.getContainerRetryCount();
        int containerRetryWindowMs = this.clusterManagerConfig.getContainerRetryWindowMs();
        boolean z = true;
        if (containerRetryCount == 0) {
            this.jobFailureCriteriaMet = this.clusterManagerConfig.shouldFailJobAfterContainerRetries();
            if (this.jobFailureCriteriaMet) {
                LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, so shutting down the application master and marking the job as failed.", str2, str);
            } else {
                LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry count is set to 0, but the job will continue to run with the failed container.", str2, str);
                this.state.failedProcessors.put(str2, samzaResourceStatus);
            }
            z = false;
        } else if (containerRetryCount > 0) {
            if (this.processorFailures.containsKey(str2)) {
                ProcessorFailure processorFailure = this.processorFailures.get(str2);
                i2 = processorFailure.getCount() + 1;
                Instant plus = processorFailure.getLastFailure().plus((TemporalAmount) getRetryDelay(str2));
                j = now.toEpochMilli() - plus.toEpochMilli();
                if (j < 0) {
                    LOG.warn("Last failure at: {} with a retry attempted at: {} which is supposed to be before current time of: {}", new Object[]{processorFailure.getLastFailure(), plus, now});
                }
            } else {
                i2 = 1;
                j = 0;
            }
            if (j >= containerRetryWindowMs) {
                LOG.info("Resetting failure count for Processor ID: {} back to 1, since last failure (for Container ID: {}) was outside the bounds of the retry window.", str2, str);
                i2 = 1;
            }
            if (i2 > containerRetryCount) {
                LOG.error("Processor ID: {} (current Container ID: {}) has failed {} times, with last failure {} ms ago. This is greater than retry count of {} and window of {} ms, ", new Object[]{str2, str, Integer.valueOf(i2), Long.valueOf(j), Integer.valueOf(containerRetryCount), Integer.valueOf(containerRetryWindowMs)});
                z = false;
                if (this.clusterManagerConfig.shouldFailJobAfterContainerRetries()) {
                    this.jobFailureCriteriaMet = true;
                    LOG.error("Shutting down the application master and marking the job as failed after max retry attempts.");
                    this.state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
                } else {
                    LOG.warn("Processor ID: {} with Container ID: {} failed after all retry attempts. Job will continue to run without this container.", str2, str);
                    this.state.failedProcessors.put(str2, samzaResourceStatus);
                }
            } else {
                LOG.info("Current failure count for Processor ID: {} is {}.", str2, Integer.valueOf(i2));
                Duration duration = Duration.ZERO;
                if (!ResourceRequestState.ANY_HOST.equals(str3) && i2 == containerRetryCount) {
                    duration = Duration.ofMillis(this.clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
                }
                this.processorFailures.put(str2, new ProcessorFailure(i2, now, duration));
                z = true;
            }
        }
        if (z) {
            Duration retryDelay = getRetryDelay(str2);
            if (!retryDelay.isZero()) {
                LOG.info("Adding a delay of: {} seconds on the last container retry request for preferred host: {}", Long.valueOf(retryDelay.getSeconds()), str3);
            }
            handleContainerStop(str2, samzaResourceStatus.getContainerId(), str3, i, retryDelay);
        }
    }

    public void registerContainerPlacementAction(ContainerPlacementRequestMessage containerPlacementRequestMessage) {
        this.containerManager.registerContainerPlacementAction(containerPlacementRequestMessage, this.containerAllocator);
    }

    private Duration getRetryDelay(String str) {
        return this.processorFailures.containsKey(str) ? this.processorFailures.get(str).getLastRetryDelay() : Duration.ZERO;
    }

    private ResourceManagerFactory getContainerProcessManagerFactory(ClusterManagerConfig clusterManagerConfig) {
        try {
            return (ResourceManagerFactory) ReflectionUtil.getObj(clusterManagerConfig.getContainerManagerClass(), ResourceManagerFactory.class);
        } catch (Exception e) {
            LOG.error("Error creating the cluster resource manager.", e);
            throw new SamzaException(e);
        }
    }

    private FaultDomainManagerFactory getFaultDomainManagerFactory(ClusterManagerConfig clusterManagerConfig) {
        try {
            return (FaultDomainManagerFactory) ReflectionUtil.getObj(clusterManagerConfig.getFaultDomainManagerClass(), FaultDomainManagerFactory.class);
        } catch (Exception e) {
            LOG.error("Error creating the fault domain manager.", e);
            throw new SamzaException(e);
        }
    }

    private String getPendingProcessorId(String str) {
        for (Map.Entry<String, SamzaResource> entry : this.state.pendingProcessors.entrySet()) {
            if (entry.getValue().getContainerId().equals(str)) {
                LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", new Object[]{str, entry.getKey(), entry.getValue().getHost()});
                return entry.getKey();
            }
        }
        return null;
    }

    private Pair<String, String> getRunningProcessor(String str) {
        for (Map.Entry<String, SamzaResource> entry : this.state.runningProcessors.entrySet()) {
            if (entry.getValue().getContainerId().equals(str)) {
                LOG.info("Container ID: {} matched running Processor ID: {} on host: {}", new Object[]{str, entry.getKey(), entry.getValue().getHost()});
                return new ImmutablePair(entry.getKey(), entry.getValue().getHost());
            }
        }
        return new ImmutablePair((Object) null, (Object) null);
    }

    private void handleContainerStop(String str, String str2, String str3, int i, Duration duration) {
        this.containerManager.handleContainerStop(str, str2, str3, i, duration, this.containerAllocator);
    }
}
