/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.job.yarn;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.yarn.LocalizerResourceConfig;
import org.apache.samza.job.yarn.LocalizerResourceMapper;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnContainerRunner {
    private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class);
    private final Config config;
    private final YarnConfiguration yarnConfiguration;
    private final NMClient nmClient;
    private final YarnConfig yarnConfig;

    public YarnContainerRunner(Config config, YarnConfiguration yarnConfiguration) {
        this.config = config;
        this.yarnConfiguration = yarnConfiguration;
        this.nmClient = NMClient.createNMClient();
        this.nmClient.init((Configuration)this.yarnConfiguration);
        this.yarnConfig = new YarnConfig(config);
    }

    public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
        String containerIdStr = ConverterUtils.toString((ContainerId)container.getId());
        log.info("Got available container ID ({}) for container: {}", (Object)samzaContainerId, (Object)container);
        String jobLib = "";
        String cmdPath = "./__package/";
        String fwkPath = JobConfig.getFwkPath((Config)this.config);
        if (fwkPath != null && !fwkPath.isEmpty()) {
            cmdPath = fwkPath;
            jobLib = "export JOB_LIB_DIR=./__package/lib";
        }
        log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
        cmdBuilder.setCommandPath(cmdPath);
        String command = cmdBuilder.buildCommand();
        log.info("Container ID {} using command {}", (Object)samzaContainerId, (Object)command);
        Map<String, String> env = this.getEscapedEnvironmentVariablesMap(cmdBuilder);
        env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape((String)container.getId().toString()));
        this.printContainerEnvironmentVariables(samzaContainerId, env);
        log.info("Samza FWK path: " + command + "; env=" + env);
        Path packagePath = new Path(this.yarnConfig.getPackagePath());
        log.info("Starting container ID {} using package path {}", (Object)samzaContainerId, (Object)packagePath);
        this.startContainer(packagePath, container, env, this.getFormattedCommand("<LOG_DIR>", jobLib, command, "stdout", "stderr"));
        log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).", new Object[]{samzaContainerId, containerIdStr, container.getNodeId().getHost(), container.getNodeHttpAddress(), containerIdStr});
        log.info("Started container ID {}", (Object)samzaContainerId);
    }

    private void startContainer(Path packagePath, Container container, Map<String, String> env, final String cmd) throws SamzaContainerLaunchException {
        ByteBuffer allTokens;
        FileStatus fileStatus;
        log.info("starting container {} {} {} {}", new Object[]{packagePath, container, env, cmd});
        LocalResource packageResource = (LocalResource)Records.newRecord(LocalResource.class);
        URL packageUrl = ConverterUtils.getYarnUrlFromPath((Path)packagePath);
        try {
            fileStatus = packagePath.getFileSystem((Configuration)this.yarnConfiguration).getFileStatus(packagePath);
        }
        catch (IOException ioe) {
            log.error("IO Exception when accessing the package status from the filesystem", (Throwable)ioe);
            throw new SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem");
        }
        packageResource.setResource(packageUrl);
        log.info("set package Resource in YarnContainerRunner for {}", (Object)packageUrl);
        packageResource.setSize(fileStatus.getLen());
        packageResource.setTimestamp(fileStatus.getModificationTime());
        packageResource.setType(LocalResourceType.ARCHIVE);
        packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
        try {
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            DataOutputBuffer dob = new DataOutputBuffer();
            credentials.writeTokenStorageToStream((DataOutputStream)dob);
            Iterator iter = credentials.getAllTokens().iterator();
            while (iter.hasNext()) {
                TokenIdentifier token = ((Token)iter.next()).decodeIdentifier();
                if (token == null || !token.getKind().equals((Object)AMRMTokenIdentifier.KIND_NAME)) continue;
                iter.remove();
            }
            allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        }
        catch (IOException ioe) {
            log.error("IOException when writing credentials.", (Throwable)ioe);
            throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
        }
        HashMap<String, LocalResource> localResourceMap = new HashMap<String, LocalResource>();
        localResourceMap.put("__package", packageResource);
        LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(this.config), this.yarnConfiguration);
        localResourceMap.putAll(resourceMapper.getResourceMap());
        ContainerLaunchContext context = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        context.setEnvironment(env);
        context.setTokens(allTokens.duplicate());
        context.setCommands((List)new ArrayList<String>(){
            {
                this.add(cmd);
            }
        });
        context.setLocalResources(localResourceMap);
        log.debug("setting localResourceMap to {}", localResourceMap);
        log.debug("setting context to {}", (Object)context);
        StartContainerRequest startContainerRequest = (StartContainerRequest)Records.newRecord(StartContainerRequest.class);
        startContainerRequest.setContainerLaunchContext(context);
        try {
            this.nmClient.startContainer(container, context);
        }
        catch (YarnException ye) {
            log.error("Received YarnException when starting container: " + container.getId(), (Throwable)ye);
            throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), (Throwable)ye);
        }
        catch (IOException ioe) {
            log.error("Received IOException when starting container: " + container.getId(), (Throwable)ioe);
            throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), (Throwable)ioe);
        }
    }

    private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : env.entrySet()) {
            sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
        }
        log.info("Container ID {} using environment variables: {}", (Object)samzaContainerId, (Object)sb.toString());
    }

    private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
        HashMap<String, String> env = new HashMap<String, String>();
        for (Map.Entry entry : cmdBuilder.buildEnvironment().entrySet()) {
            String escapedValue = Util.envVarEscape((String)((String)entry.getValue()));
            env.put((String)entry.getKey(), escapedValue);
        }
        return env;
    }

    private String getFormattedCommand(String logDirExpansionVar, String jobLib, String command, String stdOut, String stdErr) {
        if (!jobLib.isEmpty()) {
            jobLib = "&& " + jobLib;
        }
        return String.format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar, jobLib, logDirExpansionVar, command, stdOut, stdErr);
    }
}

