package org.apache.hadoop.streaming;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.LazyOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
import org.apache.hadoop.streaming.io.IdentifierResolver;
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;

/* loaded from: input_file:hadoop-tools-dist-2.7.5.1/share/hadoop/tools/lib/hadoop-streaming-2.7.5.1.jar:org/apache/hadoop/streaming/StreamJob.class */
public class StreamJob implements Tool {
    protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
    static final String REDUCE_NONE = "NONE";
    private CommandLineParser parser;
    private Options allOptions;
    protected String[] argv_;
    protected boolean background_;
    protected boolean verbose_;
    protected boolean detailedUsage_;
    protected boolean printUsage;
    protected int debug_;
    protected Environment env_;
    protected String jar_;
    protected boolean localHadoop_;
    protected Configuration config_;
    protected JobConf jobConf_;
    protected JobClient jc_;
    protected ArrayList<String> inputSpecs_;
    protected TreeSet<String> seenPrimary_;
    protected boolean hasSimpleInputSpecs_;
    protected ArrayList<String> packageFiles_;
    protected ArrayList<String> shippedCanonFiles_;
    protected String output_;
    protected String mapCmd_;
    protected String comCmd_;
    protected String redCmd_;
    protected String cacheFiles;
    protected String cacheArchives;
    protected URI[] fileURIs;
    protected URI[] archiveURIs;
    protected String inReaderSpec_;
    protected String inputFormatSpec_;
    protected String outputFormatSpec_;
    protected String partitionerSpec_;
    protected String numReduceTasksSpec_;
    protected String additionalConfSpec_;
    protected String mapDebugSpec_;
    protected String reduceDebugSpec_;
    protected String ioSpec_;
    protected boolean lazyOutput_;
    protected String addTaskEnvironment_;
    protected boolean outputSingleNode_;
    protected long minRecWrittenToEnableSkip_;
    protected RunningJob running_;
    protected JobID jobId_;
    protected static final String LINK_URI = "You need to specify the uris as scheme://path#linkname,Please specify a different link name for all of your caching URIs";

    @Deprecated
    public StreamJob(String[] strArr, boolean z) {
        this();
        this.argv_ = (String[]) Arrays.copyOf(strArr, strArr.length);
        this.config_ = new Configuration();
    }

    public StreamJob() {
        this.parser = new BasicParser();
        this.printUsage = false;
        this.inputSpecs_ = new ArrayList<>();
        this.seenPrimary_ = new TreeSet<>();
        this.packageFiles_ = new ArrayList<>();
        this.shippedCanonFiles_ = new ArrayList<>();
        setupOptions();
        this.config_ = new Configuration();
    }

    @Override // org.apache.hadoop.conf.Configurable
    /* renamed from: getConf */
    public Configuration mo2930getConf() {
        return this.config_;
    }

    @Override // org.apache.hadoop.conf.Configurable
    public void setConf(Configuration configuration) {
        this.config_ = configuration;
    }

    @Override // org.apache.hadoop.util.Tool
    public int run(String[] strArr) throws Exception {
        try {
            this.argv_ = (String[]) Arrays.copyOf(strArr, strArr.length);
            init();
            preProcessArgs();
            parseArgv();
            if (this.printUsage) {
                printUsage(this.detailedUsage_);
                return 0;
            }
            postProcessArgs();
            setJobConf();
            return submitAndMonitorJob();
        } catch (IllegalArgumentException e) {
            LOG.debug("Error in streaming job", e);
            return 1;
        }
    }

    public static JobConf createJob(String[] strArr) throws IOException {
        StreamJob streamJob = new StreamJob();
        streamJob.argv_ = strArr;
        streamJob.init();
        streamJob.preProcessArgs();
        streamJob.parseArgv();
        streamJob.postProcessArgs();
        streamJob.setJobConf();
        return streamJob.jobConf_;
    }

    @Deprecated
    public int go() throws IOException {
        try {
            return run(this.argv_);
        } catch (Exception e) {
            throw new IOException(e.getMessage());
        }
    }

    protected void init() {
        try {
            this.env_ = new Environment();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    void preProcessArgs() {
        this.verbose_ = false;
        this.addTaskEnvironment_ = "HADOOP_ROOT_LOGGER=";
    }

    void postProcessArgs() throws IOException {
        if (this.inputSpecs_.size() == 0) {
            fail("Required argument: -input <name>");
        }
        if (this.output_ == null) {
            fail("Required argument: -output ");
        }
        msg("addTaskEnvironment=" + this.addTaskEnvironment_);
        Iterator<String> it = this.packageFiles_.iterator();
        while (it.hasNext()) {
            File file = new File(it.next());
            if (file.isFile()) {
                this.shippedCanonFiles_.add(file.getCanonicalPath());
            }
        }
        msg("shippedCanonFiles_=" + this.shippedCanonFiles_);
        this.mapCmd_ = unqualifyIfLocalPath(this.mapCmd_);
        this.comCmd_ = unqualifyIfLocalPath(this.comCmd_);
        this.redCmd_ = unqualifyIfLocalPath(this.redCmd_);
    }

    String unqualifyIfLocalPath(String str) throws IOException {
        String str2;
        if (str != null) {
            String str3 = str;
            String str4 = "";
            int indexOf = str.indexOf(" ");
            if (indexOf != -1) {
                str3 = str.substring(0, indexOf);
                str4 = str.substring(indexOf + 1);
            }
            try {
                str2 = new File(str3).getCanonicalPath();
            } catch (IOException e) {
                str2 = str3;
            }
            boolean contains = this.shippedCanonFiles_.contains(str2);
            msg("shipped: " + contains + " " + str2);
            if (contains) {
                String name = new File(str3).getName();
                str = str4.length() > 0 ? name + " " + str4 : name;
            }
        }
        msg("cmd=" + str);
        return str;
    }

    void parseArgv() {
        CommandLine commandLine = null;
        try {
            commandLine = this.parser.parse(this.allOptions, this.argv_);
        } catch (Exception e) {
            LOG.error(e.getMessage());
            exitUsage(this.argv_.length > 0 && "-info".equals(this.argv_[0]));
        }
        if (commandLine == null) {
            exitUsage(this.argv_.length > 0 && "-info".equals(this.argv_[0]));
            return;
        }
        List argList = commandLine.getArgList();
        if (argList != null && argList.size() > 0) {
            fail("Found " + argList.size() + " unexpected arguments on the command line " + argList);
        }
        this.detailedUsage_ = commandLine.hasOption("info");
        if (commandLine.hasOption(FsShell.Help.NAME) || this.detailedUsage_) {
            this.printUsage = true;
            return;
        }
        this.verbose_ = commandLine.hasOption("verbose");
        this.background_ = commandLine.hasOption("background");
        this.debug_ = commandLine.hasOption("debug") ? this.debug_ + 1 : this.debug_;
        String[] optionValues = commandLine.getOptionValues("input");
        if (optionValues != null && optionValues.length > 0) {
            for (String str : optionValues) {
                this.inputSpecs_.add(str);
            }
        }
        this.output_ = commandLine.getOptionValue("output");
        this.mapCmd_ = commandLine.getOptionValue("mapper");
        this.comCmd_ = commandLine.getOptionValue("combiner");
        this.redCmd_ = commandLine.getOptionValue("reducer");
        this.lazyOutput_ = commandLine.hasOption("lazyOutput");
        String[] optionValues2 = commandLine.getOptionValues("file");
        if (optionValues2 != null && optionValues2.length > 0) {
            LOG.warn("-file option is deprecated, please use generic option -files instead.");
            StringBuffer stringBuffer = new StringBuffer();
            for (String str2 : optionValues2) {
                this.packageFiles_.add(str2);
                try {
                    String path = new Path(str2).makeQualified(FileSystem.getLocal(this.config_)).toString();
                    if (stringBuffer.length() > 0) {
                        stringBuffer.append(',');
                    }
                    stringBuffer.append(path);
                } catch (Exception e2) {
                    throw new IllegalArgumentException(e2);
                }
            }
            String str3 = this.config_.get("tmpfiles", "");
            this.config_.set("tmpfiles", str3.isEmpty() ? stringBuffer.toString() : str3 + StringUtils.COMMA_STR + ((Object) stringBuffer));
            validate(this.packageFiles_);
        }
        String optionValue = commandLine.getOptionValue("dfs");
        if (null != optionValue) {
            LOG.warn("-dfs option is deprecated, please use -fs instead.");
            this.config_.set("fs.default.name", optionValue);
        }
        this.additionalConfSpec_ = commandLine.getOptionValue("additionalconfspec");
        this.inputFormatSpec_ = commandLine.getOptionValue("inputformat");
        this.outputFormatSpec_ = commandLine.getOptionValue("outputformat");
        this.numReduceTasksSpec_ = commandLine.getOptionValue("numReduceTasks");
        this.partitionerSpec_ = commandLine.getOptionValue("partitioner");
        this.inReaderSpec_ = commandLine.getOptionValue("inputreader");
        this.mapDebugSpec_ = commandLine.getOptionValue("mapdebug");
        this.reduceDebugSpec_ = commandLine.getOptionValue("reducedebug");
        this.ioSpec_ = commandLine.getOptionValue("io");
        String[] optionValues3 = commandLine.getOptionValues("cacheArchive");
        if (null != optionValues3 && optionValues3.length > 0) {
            LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
            for (String str4 : optionValues3) {
                this.cacheArchives = this.cacheArchives == null ? str4 : this.cacheArchives + StringUtils.COMMA_STR + str4;
            }
        }
        String[] optionValues4 = commandLine.getOptionValues("cacheFile");
        if (null != optionValues4 && optionValues4.length > 0) {
            LOG.warn("-cacheFile option is deprecated, please use -files instead.");
            for (String str5 : optionValues4) {
                this.cacheFiles = this.cacheFiles == null ? str5 : this.cacheFiles + StringUtils.COMMA_STR + str5;
            }
        }
        String[] optionValues5 = commandLine.getOptionValues("jobconf");
        if (null != optionValues5 && optionValues5.length > 0) {
            LOG.warn("-jobconf option is deprecated, please use -D instead.");
            for (String str6 : optionValues5) {
                String[] split = str6.split(AbstractGangliaSink.EQUAL, 2);
                this.config_.set(split[0], split[1]);
            }
        }
        String[] optionValues6 = commandLine.getOptionValues("cmdenv");
        if (null == optionValues6 || optionValues6.length <= 0) {
            return;
        }
        for (String str7 : optionValues6) {
            if (this.addTaskEnvironment_.length() > 0) {
                this.addTaskEnvironment_ += " ";
            }
            this.addTaskEnvironment_ += str7;
        }
    }

    protected void msg(String str) {
        if (this.verbose_) {
            System.out.println("STREAM: " + str);
        }
    }

    private Option createOption(String str, String str2, String str3, int i, boolean z) {
        OptionBuilder.withArgName(str3);
        OptionBuilder.hasArgs(i);
        OptionBuilder.withDescription(str2);
        OptionBuilder.isRequired(z);
        return OptionBuilder.create(str);
    }

    private Option createBoolOption(String str, String str2) {
        OptionBuilder.withDescription(str2);
        return OptionBuilder.create(str);
    }

    private void validate(List<String> list) throws IllegalArgumentException {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            File file = new File(it.next());
            if (!FileUtil.canRead(file)) {
                fail("File: " + file.getAbsolutePath() + " does not exist, or is not readable.");
            }
        }
    }

    private void setupOptions() {
        Option createOption = createOption("input", "DFS input file(s) for the Map step", "path", Integer.MAX_VALUE, false);
        Option createOption2 = createOption("output", "DFS output directory for the Reduce step", "path", 1, false);
        Option createOption3 = createOption("mapper", "The streaming command to run", "cmd", 1, false);
        Option createOption4 = createOption("combiner", "The streaming command to run", "cmd", 1, false);
        Option createOption5 = createOption("reducer", "The streaming command to run", "cmd", 1, false);
        Option createOption6 = createOption("file", "File to be shipped in the Job jar file", "file", Integer.MAX_VALUE, false);
        Option createOption7 = createOption("dfs", "Optional. Override DFS configuration", "<h:p>|local", 1, false);
        Option createOption8 = createOption("additionalconfspec", "Optional.", "spec", 1, false);
        Option createOption9 = createOption("inputformat", "Optional.", "spec", 1, false);
        Option createOption10 = createOption("outputformat", "Optional.", "spec", 1, false);
        Option createOption11 = createOption("partitioner", "Optional.", "spec", 1, false);
        Option createOption12 = createOption("numReduceTasks", "Optional.", "spec", 1, false);
        Option createOption13 = createOption("inputreader", "Optional.", "spec", 1, false);
        Option createOption14 = createOption("mapdebug", "Optional.", "spec", 1, false);
        Option createOption15 = createOption("reducedebug", "Optional", "spec", 1, false);
        Option createOption16 = createOption("jobconf", "(n=v) Optional. Add or override a JobConf property.", "spec", 1, false);
        Option createOption17 = createOption("cmdenv", "(n=v) Pass env.var to streaming commands.", "spec", 1, false);
        Option createOption18 = createOption("cacheFile", "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
        Option createOption19 = createOption("cacheArchive", "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
        Option createOption20 = createOption("io", "Optional.", "spec", 1, false);
        Option createBoolOption = createBoolOption("background", "Submit the job and don't wait till it completes.");
        Option createBoolOption2 = createBoolOption("verbose", "print verbose output");
        Option createBoolOption3 = createBoolOption("info", "print verbose output");
        Option createBoolOption4 = createBoolOption(FsShell.Help.NAME, "print this help message");
        Option createBoolOption5 = createBoolOption("debug", "print debug output");
        this.allOptions = new Options().addOption(createOption).addOption(createOption2).addOption(createOption3).addOption(createOption4).addOption(createOption5).addOption(createOption6).addOption(createOption7).addOption(createOption8).addOption(createOption9).addOption(createOption10).addOption(createOption11).addOption(createOption12).addOption(createOption13).addOption(createOption14).addOption(createOption15).addOption(createOption16).addOption(createOption17).addOption(createOption18).addOption(createOption19).addOption(createOption20).addOption(createBoolOption).addOption(createBoolOption2).addOption(createBoolOption3).addOption(createBoolOption5).addOption(createBoolOption4).addOption(createBoolOption("lazyOutput", "create outputs lazily"));
    }

    public void exitUsage(boolean z) {
        printUsage(z);
        fail("");
    }

    private void printUsage(boolean z) {
        System.out.println("Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]");
        System.out.println("Options:");
        System.out.println("  -input          <path> DFS input file(s) for the Map step.");
        System.out.println("  -output         <path> DFS output directory for the Reduce step.");
        System.out.println("  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.");
        System.out.println("  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.");
        System.out.println("  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.");
        System.out.println("  -file           <file> Optional. File/dir to be shipped in the Job jar file.\n                  Deprecated. Use generic option \"-files\" instead.");
        System.out.println("  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>\n                  Optional. The input format class.");
        System.out.println("  -outputformat   <TextOutputFormat(default)|JavaClassName>\n                  Optional. The output format class.");
        System.out.println("  -partitioner    <JavaClassName>  Optional. The partitioner class.");
        System.out.println("  -numReduceTasks <num> Optional. Number of reduce tasks.");
        System.out.println("  -inputreader    <spec> Optional. Input recordreader spec.");
        System.out.println("  -cmdenv         <n>=<v> Optional. Pass env.var to streaming commands.");
        System.out.println("  -mapdebug       <cmd> Optional. To run this script when a map task fails.");
        System.out.println("  -reducedebug    <cmd> Optional. To run this script when a reduce task fails.");
        System.out.println("  -io             <identifier> Optional. Format to use for input to and output");
        System.out.println("                  from mapper/reducer commands");
        System.out.println("  -lazyOutput     Optional. Lazily create Output.");
        System.out.println("  -background     Optional. Submit the job and don't wait till it completes.");
        System.out.println("  -verbose        Optional. Print verbose output.");
        System.out.println("  -info           Optional. Print detailed usage.");
        System.out.println("  -help           Optional. Print help message.");
        System.out.println();
        GenericOptionsParser.printGenericCommandUsage(System.out);
        if (!z) {
            System.out.println();
            System.out.println("For more details about these options:");
            System.out.println("Use $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar -info");
            return;
        }
        System.out.println();
        System.out.println("Usage tips:");
        System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
        System.out.println();
        System.out.println("Default Map input format: a line is a record in UTF-8 the key part ends at first");
        System.out.println("  TAB, the rest of the line is the value");
        System.out.println();
        System.out.println("To pass a Custom input format:");
        System.out.println("  -inputformat package.MyInputFormat");
        System.out.println();
        System.out.println("Similarly, to pass a custom output format:");
        System.out.println("  -outputformat package.MyOutputFormat");
        System.out.println();
        System.out.println("The files with extensions .class and .jar/.zip, specified for the -file");
        System.out.println("  argument[s], end up in \"classes\" and \"lib\" directories respectively inside");
        System.out.println("  the working directory when the mapper and reducer are run. All other files");
        System.out.println("  specified for the -file argument[s] end up in the working directory when the");
        System.out.println("  mapper and reducer are run. The location of this working directory is");
        System.out.println("  unspecified.");
        System.out.println();
        System.out.println("To set the number of reduce tasks (num. of output files) as, say 10:");
        System.out.println("  Use -numReduceTasks 10");
        System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
        System.out.println("  Use -numReduceTasks 0");
        System.out.println("  Map output then becomes a 'side-effect output' rather than a reduce input.");
        System.out.println("  This speeds up processing. This also feels more like \"in-place\" processing");
        System.out.println("  because the input filename and the map input order are preserved.");
        System.out.println("  This is equivalent to -reducer NONE");
        System.out.println();
        System.out.println("To speed up the last maps:");
        System.out.println("  -D mapreduce.map.speculative=true");
        System.out.println("To speed up the last reduces:");
        System.out.println("  -D mapreduce.reduce.speculative=true");
        System.out.println("To name the job (appears in the JobTracker Web UI):");
        System.out.println("  -D mapreduce.job.name='My Job'");
        System.out.println("To change the local temp directory:");
        System.out.println("  -D dfs.data.dir=/tmp/dfs");
        System.out.println("  -D stream.tmpdir=/tmp/streaming");
        System.out.println("Additional local temp directories with -jt local:");
        System.out.println("  -D mapreduce.cluster.local.dir=/tmp/local");
        System.out.println("  -D mapreduce.jobtracker.system.dir=/tmp/system");
        System.out.println("  -D mapreduce.cluster.temp.dir=/tmp/temp");
        System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");
        System.out.println("  -D stream.non.zero.exit.is.failure=false");
        System.out.println("Use a custom hadoop streaming build along with standard hadoop install:");
        System.out.println("  $HADOOP_PREFIX/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
        System.out.println("    [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
        System.out.println("For more details about jobconf parameters see:");
        System.out.println("  http://wiki.apache.org/hadoop/JobConfFile");
        System.out.println("To set an environement variable in a streaming command:");
        System.out.println("   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
        System.out.println();
        System.out.println("Shortcut:");
        System.out.println("   setenv HSTREAMING \"$HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar\"");
        System.out.println();
        System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
        System.out.println("           -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
        System.out.println("  Ships a script, invokes the non-shipped perl interpreter. Shipped files go to");
        System.out.println("  the working directory so filter.pl is found by perl. Input files are all the");
        System.out.println("  daily logs for days in month 2006-04");
    }

    public void fail(String str) {
        System.err.println(str);
        System.err.println("Try -help for more information");
        throw new IllegalArgumentException(str);
    }

    protected String getHadoopClientHome() {
        String property = this.env_.getProperty("HADOOP_PREFIX");
        if (property == null) {
            property = "UNDEF";
        }
        return property;
    }

    protected boolean isLocalHadoop() {
        return StreamUtil.isLocalJobTracker(this.jobConf_);
    }

    @Deprecated
    protected String getClusterNick() {
        return "default";
    }

    protected String packageJobJar() throws IOException {
        ArrayList arrayList = new ArrayList();
        String str = this.config_.get("stream.shipped.hadoopstreaming");
        if (str == null) {
            str = StreamUtil.findInClasspath(StreamJob.class.getName());
        }
        if (str == null) {
            throw new IOException("runtime classes not found: " + getClass().getPackage());
        }
        msg("Found runtime classes in: " + str);
        if (!isLocalHadoop()) {
            if (new File(str).isDirectory()) {
                this.packageFiles_.add(str);
            } else {
                arrayList.add(str);
            }
        }
        if (this.packageFiles_.size() + arrayList.size() == 0) {
            return null;
        }
        String str2 = this.jobConf_.get("stream.tmpdir");
        File file = str2 == null ? null : new File(str2);
        File createTempFile = File.createTempFile("streamjob", ".jar", file);
        System.out.println("packageJobJar: " + this.packageFiles_ + " " + arrayList + " " + createTempFile + " tmpDir=" + file);
        if (this.debug_ == 0) {
            createTempFile.deleteOnExit();
        }
        JarBuilder jarBuilder = new JarBuilder();
        if (this.verbose_) {
            jarBuilder.setVerbose(true);
        }
        String absolutePath = createTempFile.getAbsolutePath();
        jarBuilder.merge(this.packageFiles_, arrayList, absolutePath);
        return absolutePath;
    }

    protected void getURIs(String str, String str2) {
        String[] strings = StringUtils.getStrings(str);
        this.fileURIs = StringUtils.stringToURI(StringUtils.getStrings(str2));
        this.archiveURIs = StringUtils.stringToURI(strings);
    }

    protected void setJobConf() throws IOException {
        if (this.additionalConfSpec_ != null) {
            LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
            this.config_.addResource(new Path(this.additionalConfSpec_));
        }
        this.jobConf_ = new JobConf(this.config_, StreamJob.class);
        for (int i = 0; i < this.inputSpecs_.size(); i++) {
            FileInputFormat.addInputPaths(this.jobConf_, this.inputSpecs_.get(i));
        }
        String name = getClass().getPackage().getName();
        Class cls = null;
        if (this.inReaderSpec_ == null && this.inputFormatSpec_ == null) {
            cls = TextInputFormat.class;
        } else if (this.inputFormatSpec_ != null) {
            if (this.inputFormatSpec_.equals(TextInputFormat.class.getName()) || this.inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName()) || this.inputFormatSpec_.equals(TextInputFormat.class.getSimpleName())) {
                cls = TextInputFormat.class;
            } else if (this.inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName()) || this.inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName()) || this.inputFormatSpec_.equals(KeyValueTextInputFormat.class.getSimpleName())) {
                if (this.inReaderSpec_ == null) {
                    cls = KeyValueTextInputFormat.class;
                }
            } else if (this.inputFormatSpec_.equals(SequenceFileInputFormat.class.getName()) || this.inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName()) || this.inputFormatSpec_.equals(SequenceFileInputFormat.class.getSimpleName())) {
                if (this.inReaderSpec_ == null) {
                    cls = SequenceFileInputFormat.class;
                }
            } else if (this.inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName()) || this.inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName()) || this.inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getSimpleName())) {
                cls = SequenceFileAsTextInputFormat.class;
            } else {
                Class goodClassOrNull = StreamUtil.goodClassOrNull(this.jobConf_, this.inputFormatSpec_, name);
                if (goodClassOrNull != null) {
                    cls = goodClassOrNull;
                } else {
                    fail("-inputformat : class not found : " + this.inputFormatSpec_);
                }
            }
        }
        if (cls == null) {
            cls = StreamInputFormat.class;
        }
        this.jobConf_.setInputFormat(cls);
        if (this.ioSpec_ != null) {
            this.jobConf_.set("stream.map.input", this.ioSpec_);
            this.jobConf_.set("stream.map.output", this.ioSpec_);
            this.jobConf_.set("stream.reduce.input", this.ioSpec_);
            this.jobConf_.set("stream.reduce.output", this.ioSpec_);
        }
        IdentifierResolver identifierResolver = (IdentifierResolver) ReflectionUtils.newInstance(this.jobConf_.getClass("stream.io.identifier.resolver.class", IdentifierResolver.class, IdentifierResolver.class), this.jobConf_);
        identifierResolver.resolve(this.jobConf_.get("stream.map.input", "text"));
        this.jobConf_.setClass("stream.map.input.writer.class", identifierResolver.getInputWriterClass(), InputWriter.class);
        identifierResolver.resolve(this.jobConf_.get("stream.reduce.input", "text"));
        this.jobConf_.setClass("stream.reduce.input.writer.class", identifierResolver.getInputWriterClass(), InputWriter.class);
        this.jobConf_.set("stream.addenvironment", this.addTaskEnvironment_);
        boolean z = false;
        if (this.mapCmd_ != null) {
            Class goodClassOrNull2 = StreamUtil.goodClassOrNull(this.jobConf_, this.mapCmd_, name);
            if (goodClassOrNull2 != null) {
                this.jobConf_.setMapperClass(goodClassOrNull2);
            } else {
                z = true;
                this.jobConf_.setMapperClass(PipeMapper.class);
                this.jobConf_.setMapRunnerClass(PipeMapRunner.class);
                this.jobConf_.set("stream.map.streamprocessor", URLEncoder.encode(this.mapCmd_, "UTF-8"));
            }
        }
        if (this.comCmd_ != null) {
            Class goodClassOrNull3 = StreamUtil.goodClassOrNull(this.jobConf_, this.comCmd_, name);
            if (goodClassOrNull3 != null) {
                this.jobConf_.setCombinerClass(goodClassOrNull3);
            } else {
                this.jobConf_.setCombinerClass(PipeCombiner.class);
                this.jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(this.comCmd_, "UTF-8"));
            }
        }
        if (this.numReduceTasksSpec_ != null) {
            this.jobConf_.setNumReduceTasks(Integer.parseInt(this.numReduceTasksSpec_));
        }
        boolean z2 = false;
        if (this.redCmd_ != null) {
            if (this.redCmd_.equals("NONE")) {
                this.jobConf_.setNumReduceTasks(0);
            }
            if (this.jobConf_.getNumReduceTasks() != 0) {
                if (this.redCmd_.compareToIgnoreCase("aggregate") == 0) {
                    this.jobConf_.setReducerClass(ValueAggregatorReducer.class);
                    this.jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
                } else {
                    Class goodClassOrNull4 = StreamUtil.goodClassOrNull(this.jobConf_, this.redCmd_, name);
                    if (goodClassOrNull4 != null) {
                        this.jobConf_.setReducerClass(goodClassOrNull4);
                    } else {
                        z2 = true;
                        this.jobConf_.setReducerClass(PipeReducer.class);
                        this.jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(this.redCmd_, "UTF-8"));
                    }
                }
            }
        }
        identifierResolver.resolve(this.jobConf_.get("stream.map.output", "text"));
        this.jobConf_.setClass("stream.map.output.reader.class", identifierResolver.getOutputReaderClass(), OutputReader.class);
        if (z || this.jobConf_.get("stream.map.output") != null) {
            this.jobConf_.setMapOutputKeyClass(identifierResolver.getOutputKeyClass());
            this.jobConf_.setMapOutputValueClass(identifierResolver.getOutputValueClass());
            if (this.jobConf_.getNumReduceTasks() == 0) {
                this.jobConf_.setOutputKeyClass(identifierResolver.getOutputKeyClass());
                this.jobConf_.setOutputValueClass(identifierResolver.getOutputValueClass());
            }
        }
        identifierResolver.resolve(this.jobConf_.get("stream.reduce.output", "text"));
        this.jobConf_.setClass("stream.reduce.output.reader.class", identifierResolver.getOutputReaderClass(), OutputReader.class);
        if (z2 || this.jobConf_.get("stream.reduce.output") != null) {
            this.jobConf_.setOutputKeyClass(identifierResolver.getOutputKeyClass());
            this.jobConf_.setOutputValueClass(identifierResolver.getOutputValueClass());
        }
        if (this.inReaderSpec_ != null) {
            String[] split = this.inReaderSpec_.split(StringUtils.COMMA_STR);
            String str = split[0];
            Class goodClassOrNull5 = StreamUtil.goodClassOrNull(this.jobConf_, str, name);
            if (goodClassOrNull5 != null) {
                this.jobConf_.set("stream.recordreader.class", goodClassOrNull5.getName());
            } else {
                fail("-inputreader: class not found: " + str);
            }
            for (int i2 = 1; i2 < split.length; i2++) {
                String[] split2 = split[i2].split(AbstractGangliaSink.EQUAL, 2);
                this.jobConf_.set("stream.recordreader." + split2[0], split2.length > 1 ? split2[1] : "");
            }
        }
        FileOutputFormat.setOutputPath(this.jobConf_, new Path(this.output_));
        Class cls2 = null;
        if (this.outputFormatSpec_ != null) {
            Class goodClassOrNull6 = StreamUtil.goodClassOrNull(this.jobConf_, this.outputFormatSpec_, name);
            if (goodClassOrNull6 != null) {
                cls2 = goodClassOrNull6;
            } else {
                fail("-outputformat : class not found : " + this.outputFormatSpec_);
            }
        }
        if (cls2 == null) {
            cls2 = TextOutputFormat.class;
        }
        if (this.lazyOutput_) {
            LazyOutputFormat.setOutputFormatClass(this.jobConf_, cls2);
        } else {
            this.jobConf_.setOutputFormat(cls2);
        }
        if (this.partitionerSpec_ != null) {
            Class goodClassOrNull7 = StreamUtil.goodClassOrNull(this.jobConf_, this.partitionerSpec_, name);
            if (goodClassOrNull7 != null) {
                this.jobConf_.setPartitionerClass(goodClassOrNull7);
            } else {
                fail("-partitioner : class not found : " + this.partitionerSpec_);
            }
        }
        if (this.mapDebugSpec_ != null) {
            this.jobConf_.setMapDebugScript(this.mapDebugSpec_);
        }
        if (this.reduceDebugSpec_ != null) {
            this.jobConf_.setReduceDebugScript(this.reduceDebugSpec_);
        }
        this.jar_ = packageJobJar();
        if (this.jar_ != null) {
            this.jobConf_.setJar(this.jar_);
        }
        if (this.cacheArchives != null || this.cacheFiles != null) {
            getURIs(this.cacheArchives, this.cacheFiles);
            if (!DistributedCache.checkURIs(this.fileURIs, this.archiveURIs)) {
                fail(LINK_URI);
            }
        }
        if (this.cacheArchives != null) {
            DistributedCache.setCacheArchives(this.archiveURIs, this.jobConf_);
        }
        if (this.cacheFiles != null) {
            DistributedCache.setCacheFiles(this.fileURIs, this.jobConf_);
        }
        if (this.verbose_) {
            listJobConfProperties();
        }
        msg("submitting to jobconf: " + getJobTrackerHostPort());
    }

    protected void listJobConfProperties() {
        msg("==== JobConf properties:");
        TreeMap treeMap = new TreeMap();
        Iterator it = this.jobConf_.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            treeMap.put(entry.getKey(), entry.getValue());
        }
        for (Map.Entry entry2 : treeMap.entrySet()) {
            msg(((String) entry2.getKey()) + AbstractGangliaSink.EQUAL + ((String) entry2.getValue()));
        }
        msg("====");
    }

    protected String getJobTrackerHostPort() {
        return this.jobConf_.get("mapreduce.jobtracker.address");
    }

    public int submitAndMonitorJob() throws IOException {
        if (this.jar_ != null && isLocalHadoop()) {
            RunJar.unJar(new File(this.jar_), new File(".").getAbsoluteFile());
        }
        this.jc_ = new JobClient(this.jobConf_);
        this.running_ = null;
        try {
            this.running_ = this.jc_.submitJob(this.jobConf_);
            this.jobId_ = this.running_.getID();
            if (this.background_) {
                LOG.info("Job is running in background.");
            } else if (!this.jc_.monitorAndPrintJob(this.jobConf_, this.running_)) {
                LOG.error("Job not successful!");
                return 1;
            }
            LOG.info("Output directory: " + this.output_);
            return 0;
        } catch (IOException e) {
            LOG.error("Error Launching job : " + e.getMessage());
            return 5;
        } catch (FileNotFoundException e2) {
            LOG.error("Error launching job , bad input path : " + e2.getMessage());
            return 2;
        } catch (FileAlreadyExistsException e3) {
            LOG.error("Error launching job , Output path already exists : " + e3.getMessage());
            return 4;
        } catch (InvalidJobConfException e4) {
            LOG.error("Error launching job , Invalid job conf : " + e4.getMessage());
            return 3;
        } catch (InterruptedException e5) {
            LOG.error("Error monitoring job : " + e5.getMessage());
            return 6;
        } finally {
            this.jc_.close();
        }
    }
}
