/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.job.yarn;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.ClusterResourceManager;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.clustermanager.SamzaResourceRequest;
import org.apache.samza.clustermanager.SamzaResourceStatus;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.yarn.FileSystemImplConfig;
import org.apache.samza.job.yarn.JobContext;
import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
import org.apache.samza.job.yarn.SamzaYarnAppMasterLifecycle;
import org.apache.samza.job.yarn.SamzaYarnAppMasterService;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
import org.apache.samza.job.yarn.YarnContainerRunner;
import org.apache.samza.job.yarn.YarnJobUtil;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnClusterResourceManager
extends ClusterResourceManager
implements AMRMClientAsync.CallbackHandler {
    private final String INVALID_YARN_CONTAINER_ID = "-1";
    private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
    private final YarnContainerRunner yarnContainerRunner;
    private final YarnConfiguration hConfig;
    private final YarnAppState state;
    private final SamzaYarnAppMasterLifecycle lifecycle;
    private final SamzaYarnAppMasterService service;
    private final YarnConfig yarnConfig;
    private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap();
    private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap();
    private final SamzaAppMasterMetrics metrics;
    final AtomicBoolean started = new AtomicBoolean(false);
    private final Object lock = new Object();
    private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);

    public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState) {
        super(callback);
        YarnConfig yarnConfig;
        this.hConfig = new YarnConfiguration();
        this.hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
        FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
        fsImplConfig.getSchemes().forEach(scheme -> fsImplConfig.getSchemeConfig((String)scheme).forEach((confKey, confValue) -> this.hConfig.set(confKey, confValue)));
        MetricsRegistryMap registry = new MetricsRegistryMap();
        this.metrics = new SamzaAppMasterMetrics(config, samzaAppState, (ReadableMetricsRegistry)registry);
        String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
        ContainerId containerId = ConverterUtils.toContainerId((String)containerIdStr);
        String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
        String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
        String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
        int nodePort = Integer.parseInt(nodePortString);
        int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
        this.yarnConfig = yarnConfig = new YarnConfig(config);
        int interval = yarnConfig.getAMPollIntervalMs();
        this.amClient = AMRMClientAsync.createAMRMClientAsync((int)interval, (AMRMClientAsync.CallbackHandler)this);
        this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);
        log.info("Initialized YarnAppState: {}", (Object)this.state.toString());
        this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, (ReadableMetricsRegistry)registry, this.hConfig);
        log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport {}", new Object[]{containerIdStr, nodeHostString, nodePort, nodeHttpPort});
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
        this.lifecycle = new SamzaYarnAppMasterLifecycle(clusterManagerConfig.getContainerMemoryMb(), clusterManagerConfig.getNumCores(), samzaAppState, this.state, this.amClient);
        this.yarnContainerRunner = new YarnContainerRunner(config, this.hConfig);
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            log.info("Attempting to start an already started ContainerManager");
            return;
        }
        this.metrics.start();
        this.service.onInit();
        log.info("Starting YarnContainerManager.");
        this.amClient.init((Configuration)this.hConfig);
        this.amClient.start();
        this.lifecycle.onInit();
        if (this.lifecycle.shouldShutdown()) {
            this.clusterManagerCallback.onError((Throwable)new SamzaException("Invalid resource request."));
        }
        log.info("Finished starting YarnContainerManager");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestResources(SamzaResourceRequest resourceRequest) {
        AMRMClient.ContainerRequest issuedRequest;
        boolean DEFAULT_PRIORITY = false;
        log.info("Requesting resources on  " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
        int memoryMb = resourceRequest.getMemoryMB();
        int cpuCores = resourceRequest.getNumCores();
        String containerLabel = this.yarnConfig.getContainerLabel();
        String preferredHost = resourceRequest.getPreferredHost();
        Resource capability = Resource.newInstance((int)memoryMb, (int)cpuCores);
        Priority priority = Priority.newInstance((int)0);
        if (preferredHost.equals("ANY_HOST")) {
            log.info("Making a request for ANY_HOST " + preferredHost);
            issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, true, containerLabel);
        } else {
            log.info("Making a preferred host request on " + preferredHost);
            issuedRequest = new AMRMClient.ContainerRequest(capability, new String[]{preferredHost}, null, priority, true, containerLabel);
        }
        Object object = this.lock;
        synchronized (object) {
            this.requestsMap.put(resourceRequest, issuedRequest);
            this.amClient.addContainerRequest(issuedRequest);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseResources(SamzaResource resource) {
        log.info("Release resource invoked {} ", (Object)resource);
        Object object = this.lock;
        synchronized (object) {
            Container container = this.allocatedResources.get(resource);
            if (container == null) {
                log.info("Resource {} already released. ", (Object)resource);
                return;
            }
            this.amClient.releaseAssignedContainer(container.getId());
            this.allocatedResources.remove(resource);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
        String containerIDStr = (String)builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
        log.info("Received launch request for {} on hostname {}", (Object)containerIDStr, (Object)resource.getHost());
        Object object = this.lock;
        synchronized (object) {
            Container container = this.allocatedResources.get(resource);
            if (container == null) {
                log.info("Resource {} already released. ", (Object)resource);
                return;
            }
            this.state.runningYarnContainers.put(containerIDStr, new YarnContainer(container));
            this.yarnContainerRunner.runContainer(containerIDStr, container, builder);
        }
    }

    private String getIDForContainer(String lookupContainerId) {
        String samzaContainerID = "-1";
        for (Map.Entry<String, YarnContainer> entry : this.state.runningYarnContainers.entrySet()) {
            String key = entry.getKey();
            YarnContainer yarnContainer = entry.getValue();
            String yarnContainerId = yarnContainer.id().toString();
            if (!yarnContainerId.equals(lookupContainerId)) continue;
            return key;
        }
        return samzaContainerID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelResourceRequest(SamzaResourceRequest request) {
        log.info("Cancelling request {} ", (Object)request);
        Object object = this.lock;
        synchronized (object) {
            AMRMClient.ContainerRequest containerRequest = this.requestsMap.get(request);
            if (containerRequest == null) {
                log.info("Cancellation of {} already done. ", (Object)request);
                return;
            }
            this.requestsMap.remove(request);
            this.amClient.removeContainerRequest(containerRequest);
        }
    }

    public void stop(SamzaApplicationState.SamzaAppStatus status2) {
        log.info("Stopping AM client ");
        this.lifecycle.onShutdown(status2);
        this.amClient.stop();
        log.info("Stopping the AM service ");
        this.service.onShutdown();
        this.metrics.stop();
        if (status2 != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
            this.cleanupStagingDir();
        }
    }

    private void cleanupStagingDir() {
        String yarnJobStagingDirectory = this.yarnConfig.getYarnJobStagingDirectory();
        if (yarnJobStagingDirectory != null) {
            JobContext context = new JobContext();
            context.setAppStagingDir(new Path(yarnJobStagingDirectory));
            FileSystem fs = null;
            try {
                fs = FileSystem.get((Configuration)this.hConfig);
            }
            catch (IOException e) {
                log.error("Unable to clean up file system: {}", (Throwable)e);
                return;
            }
            if (fs != null) {
                YarnJobUtil.cleanupStagingDir(context, fs);
            }
        }
    }

    public void onContainersCompleted(List<ContainerStatus> statuses) {
        ArrayList<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<SamzaResourceStatus>();
        for (ContainerStatus status2 : statuses) {
            log.info("Container completed from RM " + status2);
            SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status2.getContainerId().toString(), status2.getDiagnostics(), status2.getExitStatus());
            samzaResrcStatuses.add(samzaResrcStatus);
            String completedContainerID = this.getIDForContainer(status2.getContainerId().toString());
            log.info("Completed container had ID: {}", (Object)completedContainerID);
            if (completedContainerID.equals("-1") || !this.state.runningYarnContainers.containsKey(completedContainerID)) continue;
            log.info("Removing container ID {} from completed containers", (Object)completedContainerID);
            this.state.runningYarnContainers.remove(completedContainerID);
            if (status2.getExitStatus() == 0) continue;
            this.state.failedContainersStatus.put(status2.getContainerId().toString(), status2);
        }
        this.clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
    }

    public void onContainersAllocated(List<Container> containers) {
        ArrayList<SamzaResource> resources = new ArrayList<SamzaResource>();
        for (Container container : containers) {
            log.info("Container allocated from RM on " + container.getNodeId().getHost());
            String id = container.getId().toString();
            String host = container.getNodeId().getHost();
            int memory = container.getResource().getMemory();
            int numCores = container.getResource().getVirtualCores();
            SamzaResource resource = new SamzaResource(numCores, memory, host, id);
            this.allocatedResources.put(resource, container);
            resources.add(resource);
        }
        this.clusterManagerCallback.onResourcesAvailable(resources);
    }

    public void onShutdownRequest() {
    }

    public void onNodesUpdated(List<NodeReport> updatedNodes) {
    }

    public float getProgress() {
        return 0.0f;
    }

    public void onError(Throwable e) {
        log.error("Exception in the Yarn callback {}", e);
        this.clusterManagerCallback.onError(e);
    }
}

