package org.apache.samza.job.yarn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/job/yarn/YarnContainerRunner.class */
public class YarnContainerRunner {
    private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class);
    private final Config config;
    private final YarnConfiguration yarnConfiguration;
    private final NMClient nmClient = NMClient.createNMClient();
    private final YarnConfig yarnConfig;

    public YarnContainerRunner(Config config, YarnConfiguration yarnConfiguration) {
        this.config = config;
        this.yarnConfiguration = yarnConfiguration;
        this.nmClient.init(this.yarnConfiguration);
        this.yarnConfig = new YarnConfig(config);
    }

    public void runContainer(String str, Container container, CommandBuilder commandBuilder) throws org.apache.samza.clustermanager.SamzaContainerLaunchException {
        String converterUtils = ConverterUtils.toString(container.getId());
        log.info("Got available container ID ({}) for container: {}", str, container);
        String str2 = "";
        String str3 = "./__package/";
        String fwkPath = JobConfig.getFwkPath(this.config);
        if (fwkPath != null && !fwkPath.isEmpty()) {
            str3 = fwkPath;
            str2 = "export JOB_LIB_DIR=./__package/lib";
        }
        log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + str3 + ";jobLib=" + str2);
        commandBuilder.setCommandPath(str3);
        String buildCommand = commandBuilder.buildCommand();
        log.info("Container ID {} using command {}", str, buildCommand);
        Map<String, String> escapedEnvironmentVariablesMap = getEscapedEnvironmentVariablesMap(commandBuilder);
        escapedEnvironmentVariablesMap.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
        printContainerEnvironmentVariables(str, escapedEnvironmentVariablesMap);
        log.info("Samza FWK path: " + buildCommand + "; env=" + escapedEnvironmentVariablesMap);
        Path path = new Path(this.yarnConfig.getPackagePath());
        log.info("Starting container ID {} using package path {}", str, path);
        startContainer(path, container, escapedEnvironmentVariablesMap, getFormattedCommand("<LOG_DIR>", str2, buildCommand, "stdout", "stderr"));
        log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).", new Object[]{str, converterUtils, container.getNodeId().getHost(), container.getNodeHttpAddress(), converterUtils});
        log.info("Started container ID {}", str);
    }

    private void startContainer(Path path, Container container, Map<String, String> map, final String str) throws org.apache.samza.clustermanager.SamzaContainerLaunchException {
        log.info("starting container {} {} {} {}", new Object[]{path, container, map, str});
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(path);
        try {
            FileStatus fileStatus = path.getFileSystem(this.yarnConfiguration).getFileStatus(path);
            localResource.setResource(yarnUrlFromPath);
            log.info("set package Resource in YarnContainerRunner for {}", yarnUrlFromPath);
            localResource.setSize(fileStatus.getLen());
            localResource.setTimestamp(fileStatus.getModificationTime());
            localResource.setType(LocalResourceType.ARCHIVE);
            localResource.setVisibility(LocalResourceVisibility.APPLICATION);
            try {
                Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
                DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                credentials.writeTokenStorageToStream(dataOutputBuffer);
                Iterator it = credentials.getAllTokens().iterator();
                while (it.hasNext()) {
                    TokenIdentifier decodeIdentifier = ((Token) it.next()).decodeIdentifier();
                    if (decodeIdentifier != null && decodeIdentifier.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                        it.remove();
                    }
                }
                ByteBuffer wrap = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
                HashMap hashMap = new HashMap();
                hashMap.put("__package", localResource);
                hashMap.putAll(new LocalizerResourceMapper(new LocalizerResourceConfig(this.config), this.yarnConfiguration).getResourceMap());
                ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
                containerLaunchContext.setEnvironment(map);
                containerLaunchContext.setTokens(wrap.duplicate());
                containerLaunchContext.setCommands(new ArrayList<String>() { // from class: org.apache.samza.job.yarn.YarnContainerRunner.1
                    {
                        add(str);
                    }
                });
                containerLaunchContext.setLocalResources(hashMap);
                log.debug("setting localResourceMap to {}", hashMap);
                log.debug("setting context to {}", containerLaunchContext);
                ((StartContainerRequest) Records.newRecord(StartContainerRequest.class)).setContainerLaunchContext(containerLaunchContext);
                try {
                    this.nmClient.startContainer(container, containerLaunchContext);
                } catch (IOException e) {
                    log.error("Received IOException when starting container: " + container.getId(), e);
                    throw new org.apache.samza.clustermanager.SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), e);
                } catch (YarnException e2) {
                    log.error("Received YarnException when starting container: " + container.getId(), e2);
                    throw new org.apache.samza.clustermanager.SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), e2);
                }
            } catch (IOException e3) {
                log.error("IOException when writing credentials.", e3);
                throw new org.apache.samza.clustermanager.SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
            }
        } catch (IOException e4) {
            log.error("IO Exception when accessing the package status from the filesystem", e4);
            throw new org.apache.samza.clustermanager.SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem");
        }
    }

    private void printContainerEnvironmentVariables(String str, Map<String, String> map) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
        }
        log.info("Container ID {} using environment variables: {}", str, sb.toString());
    }

    private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder commandBuilder) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : commandBuilder.buildEnvironment().entrySet()) {
            hashMap.put(entry.getKey(), Util.envVarEscape((String) entry.getValue()));
        }
        return hashMap;
    }

    private String getFormattedCommand(String str, String str2, String str3, String str4, String str5) {
        if (!str2.isEmpty()) {
            str2 = "&& " + str2;
        }
        return String.format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", str, str2, str, str3, str4, str5);
    }
}
