package org.apache.samza.clustermanager;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.ClusterResourceManager;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.ContainerProcessManagerMetrics;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.ClassLoaderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/ContainerProcessManager.class */
public class ContainerProcessManager implements ClusterResourceManager.Callback {
    private static final Logger log = LoggerFactory.getLogger(ContainerProcessManager.class);
    private final boolean hostAffinityEnabled;
    private final SamzaApplicationState state;
    private final ClusterManagerConfig clusterManagerConfig;
    private final JobConfig jobConfig;
    private final AbstractContainerAllocator containerAllocator;
    private final Thread allocatorThread;
    private final ClusterResourceManager clusterResourceManager;
    private volatile boolean tooManyFailedContainers = false;
    private volatile Throwable exceptionOccurred = null;
    private final Map<String, ResourceFailure> containerFailures = new HashMap();
    private final ContainerProcessManagerMetrics metrics;

    public ContainerProcessManager(Config config, SamzaApplicationState samzaApplicationState, MetricsRegistryMap metricsRegistryMap) {
        this.state = samzaApplicationState;
        this.clusterManagerConfig = new ClusterManagerConfig(config);
        this.jobConfig = new JobConfig(config);
        this.hostAffinityEnabled = this.clusterManagerConfig.getHostAffinityEnabled();
        this.clusterResourceManager = getContainerProcessManagerFactory(this.clusterManagerConfig).getClusterResourceManager(this, samzaApplicationState);
        this.metrics = new ContainerProcessManagerMetrics(config, samzaApplicationState, metricsRegistryMap);
        if (this.hostAffinityEnabled) {
            this.containerAllocator = new HostAwareContainerAllocator(this.clusterResourceManager, this.clusterManagerConfig.getContainerRequestTimeout(), config, samzaApplicationState);
        } else {
            this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, samzaApplicationState);
        }
        this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
        log.info("finished initialization of samza task manager");
    }

    ContainerProcessManager(Config config, SamzaApplicationState samzaApplicationState, MetricsRegistryMap metricsRegistryMap, ClusterResourceManager clusterResourceManager) {
        JobModelManager jobModelManager = samzaApplicationState.jobModelManager;
        this.state = samzaApplicationState;
        this.clusterManagerConfig = new ClusterManagerConfig(config);
        this.jobConfig = new JobConfig(config);
        this.hostAffinityEnabled = this.clusterManagerConfig.getHostAffinityEnabled();
        this.clusterResourceManager = clusterResourceManager;
        this.metrics = new ContainerProcessManagerMetrics(config, samzaApplicationState, metricsRegistryMap);
        if (this.hostAffinityEnabled) {
            this.containerAllocator = new HostAwareContainerAllocator(this.clusterResourceManager, this.clusterManagerConfig.getContainerRequestTimeout(), config, samzaApplicationState);
        } else {
            this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, samzaApplicationState);
        }
        this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
        log.info("finished initialization of samza task manager");
    }

    public boolean shouldShutdown() {
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = Integer.valueOf(this.state.completedContainers.get());
        objArr[1] = this.state.containerCount;
        objArr[2] = this.tooManyFailedContainers ? "yes" : "no";
        objArr[3] = this.allocatorThread.isAlive() ? "yes" : "no";
        logger.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ", objArr);
        if (this.exceptionOccurred == null) {
            return this.tooManyFailedContainers || this.state.completedContainers.get() == this.state.containerCount.get() || !this.allocatorThread.isAlive();
        }
        log.error("Exception in ContainerProcessManager", this.exceptionOccurred);
        throw new SamzaException(this.exceptionOccurred);
    }

    public void start() {
        this.metrics.start();
        log.info("Starting Container Process Manager");
        this.clusterResourceManager.start();
        log.info("Starting the Samza task manager");
        int containerCount = this.jobConfig.getContainerCount();
        this.state.containerCount.set(containerCount);
        this.state.neededContainers.set(containerCount);
        this.containerAllocator.requestResources(this.state.jobModelManager.jobModel().getAllContainerLocality());
        log.info("Starting the container allocator thread");
        this.allocatorThread.start();
    }

    public void stop() {
        log.info("Invoked stop of the Samza container process manager");
        this.containerAllocator.stop();
        try {
            this.allocatorThread.join();
        } catch (InterruptedException e) {
            log.error("Allocator Thread join() threw an interrupted exception", e);
            Thread.currentThread().interrupt();
        }
        if (this.metrics != null) {
            try {
                this.metrics.stop();
            } catch (Throwable th) {
                log.error("Exception while stopping metrics {}", th);
            }
            log.info("Stopped metrics reporters");
        }
        if (this.clusterResourceManager != null) {
            try {
                this.clusterResourceManager.stop(this.state.status);
            } catch (Throwable th2) {
                log.error("Exception while stopping cluster resource manager {}", th2);
            }
            log.info("Stopped cluster resource manager");
        }
        log.info("Finished stop of Container process manager");
    }

    public void onResourceAllocated(SamzaResource samzaResource) {
        log.info("Container allocated from RM on " + samzaResource.getHost());
        this.containerAllocator.addResource(samzaResource);
    }

    public void onResourceCompleted(SamzaResourceStatus samzaResourceStatus) {
        int i;
        long j;
        String resourceID = samzaResourceStatus.getResourceID();
        String str = null;
        Iterator<Map.Entry<String, SamzaResource>> it = this.state.runningContainers.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, SamzaResource> next = it.next();
            if (next.getValue().getResourceID().equals(samzaResourceStatus.getResourceID())) {
                log.info("Matching container ID found " + next.getKey() + " " + next.getValue());
                str = next.getKey();
                break;
            }
        }
        if (str == null) {
            log.info("No matching container id found for " + samzaResourceStatus.toString());
        } else {
            this.state.runningContainers.remove(str);
        }
        int exitCode = samzaResourceStatus.getExitCode();
        switch (exitCode) {
            case SamzaResourceStatus.PREEMPTED /* -102 */:
            case SamzaResourceStatus.DISK_FAIL /* -101 */:
            case SamzaResourceStatus.ABORTED /* -100 */:
                log.info("Got an exit code of {}. This means that container {} was killed by YARN, either due to being released by the application master or being 'lost' due to node failures etc. or due to preemption by the RM", Integer.valueOf(exitCode), resourceID);
                this.state.releasedContainers.incrementAndGet();
                if (str != null) {
                    log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", resourceID, str);
                    this.state.neededContainers.incrementAndGet();
                    this.state.jobHealthy.set(false);
                    this.containerAllocator.requestResource(str, ResourceRequestState.ANY_HOST);
                    return;
                }
                return;
            case 0:
                log.info("Container {} completed successfully.", resourceID);
                this.state.completedContainers.incrementAndGet();
                if (str != null) {
                    this.state.finishedContainers.incrementAndGet();
                    this.containerFailures.remove(str);
                }
                if (this.state.completedContainers.get() == this.state.containerCount.get()) {
                    log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
                    this.state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
                    return;
                }
                return;
            default:
                log.info("Container failed for some reason. Let's start it again");
                log.info("Container " + resourceID + " failed with exit code . " + exitCode + " - " + samzaResourceStatus.getDiagnostics() + " containerID is " + str);
                this.state.failedContainers.incrementAndGet();
                this.state.failedContainersStatus.put(resourceID, samzaResourceStatus);
                this.state.jobHealthy.set(false);
                if (str != null) {
                    this.state.neededContainers.incrementAndGet();
                    String containerToHostValue = this.state.jobModelManager.jobModel().getContainerToHostValue(str, SetContainerHostMapping.HOST_KEY);
                    if (!this.hostAffinityEnabled || containerToHostValue == null) {
                        containerToHostValue = ResourceRequestState.ANY_HOST;
                    }
                    log.info("Container was last seen on " + containerToHostValue);
                    int containerRetryCount = this.clusterManagerConfig.getContainerRetryCount();
                    int containerRetryWindowMs = this.clusterManagerConfig.getContainerRetryWindowMs();
                    if (containerRetryCount == 0) {
                        log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", str, resourceID);
                        this.tooManyFailedContainers = true;
                    } else if (containerRetryCount > 0) {
                        if (this.containerFailures.containsKey(str)) {
                            ResourceFailure resourceFailure = this.containerFailures.get(str);
                            i = resourceFailure.getCount() + 1;
                            j = resourceFailure.getLastFailure().longValue();
                        } else {
                            i = 1;
                            j = 0;
                        }
                        if (i >= containerRetryCount) {
                            long currentTimeMillis = System.currentTimeMillis() - j;
                            if (currentTimeMillis < containerRetryWindowMs) {
                                log.error("Container ID " + str + "(" + resourceID + ") has failed " + i + " times, with last failure " + currentTimeMillis + "ms ago. This is greater than retry count of " + containerRetryCount + " and window of " + containerRetryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
                                this.tooManyFailedContainers = true;
                                this.state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
                            } else {
                                log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for this container ID was outside the bounds of the retry window.", str, resourceID);
                                this.containerFailures.put(str, new ResourceFailure(1, Long.valueOf(System.currentTimeMillis())));
                            }
                        } else {
                            log.info("Current fail count for container ID {} is {}.", str, Integer.valueOf(i));
                            this.containerFailures.put(str, new ResourceFailure(i, Long.valueOf(System.currentTimeMillis())));
                        }
                    }
                    if (this.tooManyFailedContainers) {
                        return;
                    }
                    log.info("Requesting a refactor container ");
                    this.containerAllocator.requestResource(str, containerToHostValue);
                    return;
                }
                return;
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onResourcesAvailable(List<SamzaResource> list) {
        Iterator<SamzaResource> it = list.iterator();
        while (it.hasNext()) {
            onResourceAllocated(it.next());
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onResourcesCompleted(List<SamzaResourceStatus> list) {
        Iterator<SamzaResourceStatus> it = list.iterator();
        while (it.hasNext()) {
            onResourceCompleted(it.next());
        }
    }

    @Override // org.apache.samza.clustermanager.ClusterResourceManager.Callback
    public void onError(Throwable th) {
        log.error("Exception occured in callbacks in the Container Manager : {}", th);
        this.exceptionOccurred = th;
    }

    private ResourceManagerFactory getContainerProcessManagerFactory(ClusterManagerConfig clusterManagerConfig) {
        try {
            return (ResourceManagerFactory) ClassLoaderHelper.fromClassName(clusterManagerConfig.getContainerManagerClass());
        } catch (ClassNotFoundException e) {
            log.error("ClassNotFound Exception when creating ContainerManager", e);
            throw new SamzaException(e);
        } catch (IllegalAccessException e2) {
            log.error("Illegal access exception when creating ContainerManager", e2);
            throw new SamzaException(e2);
        } catch (InstantiationException e3) {
            log.error("Instantiation exception when creating ContainerManager", e3);
            throw new SamzaException(e3);
        }
    }
}
