/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;

public class TestLocalRunner
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
    private static int[] INPUT_SIZES = new int[]{50000, 500, 500, 20, 5000, 500};
    private static int[] OUTPUT_SIZES = new int[]{1, 500, 500, 500, 500, 500};
    private static int[] SLEEP_INTERVALS = new int[]{10000, 15, 15, 20, 250, 60};
    private static int TOTAL_RECORDS = 0;
    private final String INPUT_DIR = "multiMapInput";
    private final String OUTPUT_DIR = "multiMapOutput";
    private static final int NUMBER_FILE_VAL = 100;

    private void createInputFile(Path dirPath, int id, int numRecords) throws IOException {
        String MESSAGE = "This is a line in a file: ";
        Path filePath = new Path(dirPath, "" + id);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
        for (int i = 0; i < numRecords; ++i) {
            w.write("This is a line in a file: " + id + " " + i + "\n");
        }
        w.close();
    }

    private Path getInputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapInput");
        }
        return new Path(new Path(dataDir), "multiMapInput");
    }

    private Path getOutputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapOutput");
        }
        return new Path(new Path(dataDir), "multiMapOutput");
    }

    private Path createMultiMapsInput() throws IOException {
        Path inputPath;
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(inputPath = this.getInputPath())) {
            ((FileSystem)fs).delete(inputPath, true);
        }
        for (int i = 0; i < 6; ++i) {
            this.createInputFile(inputPath, i, INPUT_SIZES[i]);
        }
        return inputPath;
    }

    private void verifyOutput(Path outputPath) throws IOException {
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        Path outputFile = new Path(outputPath, "part-r-00000");
        FSDataInputStream is = fs.open(outputFile);
        BufferedReader r = new BufferedReader(new InputStreamReader(is));
        String line = r.readLine().trim();
        TestLocalRunner.assertTrue((String)"Line does not have correct key", (boolean)line.startsWith("0\t"));
        int count = Integer.valueOf(line.substring(2));
        TestLocalRunner.assertEquals((String)"Incorrect count generated!", (int)TOTAL_RECORDS, (int)count);
        r.close();
    }

    @Test
    public void testGcCounter() throws Exception {
        Path inputPath = this.getInputPath();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(outputPath)) {
            ((FileSystem)fs).delete(outputPath, true);
        }
        if (fs.exists(inputPath)) {
            ((FileSystem)fs).delete(inputPath, true);
        }
        this.createInputFile(inputPath, 0, 20);
        Job job = Job.getInstance();
        job.setMapperClass(GCMapper.class);
        job.setNumReduceTasks(0);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean ret = job.waitForCompletion(true);
        TestLocalRunner.assertTrue((String)"job failed", (boolean)ret);
        Object gcCounter = job.getCounters().findCounter(TaskCounter.GC_TIME_MILLIS);
        TestLocalRunner.assertNotNull(gcCounter);
        TestLocalRunner.assertTrue((String)"No time spent in gc", (gcCounter.getValue() > 0L ? 1 : 0) != 0);
    }

    @Test(timeout=120000L)
    public void testMultiMaps() throws Exception {
        Job job = Job.getInstance();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(outputPath)) {
            ((FileSystem)fs).delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps(job, 6);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        final Thread toInterrupt = Thread.currentThread();
        Thread interrupter = new Thread(){

            @Override
            public void run() {
                try {
                    Thread.sleep(120000L);
                    toInterrupt.interrupt();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        };
        LOG.info((Object)"Submitting job...");
        job.submit();
        LOG.info((Object)"Starting thread to interrupt main thread in 2 minutes");
        interrupter.start();
        LOG.info((Object)"Waiting for job to complete...");
        try {
            job.waitForCompletion(true);
        }
        catch (InterruptedException ie) {
            LOG.fatal((Object)"Interrupted while waiting for job completion", (Throwable)ie);
            for (int i = 0; i < 10; ++i) {
                LOG.fatal((Object)"Dumping stacks");
                ReflectionUtils.logThreadInfo(LOG, "multimap threads", 0L);
                Thread.sleep(1000L);
            }
            throw ie;
        }
        LOG.info((Object)"Job completed, stopping interrupter");
        interrupter.interrupt();
        try {
            interrupter.join();
        }
        catch (InterruptedException ie) {
            // empty catch block
        }
        LOG.info((Object)"Verifying output");
        this.verifyOutput(outputPath);
    }

    @Test
    public void testInvalidMultiMapParallelism() throws Exception {
        Job job = Job.getInstance();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(outputPath)) {
            ((FileSystem)fs).delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps(job, -6);
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean success = job.waitForCompletion(true);
        TestLocalRunner.assertFalse((String)"Job succeeded somehow", (boolean)success);
    }

    @Test
    public void testEmptyMaps() throws Exception {
        Job job = Job.getInstance();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(outputPath)) {
            ((FileSystem)fs).delete(outputPath, true);
        }
        job.setInputFormatClass(EmptyInputFormat.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean success = job.waitForCompletion(true);
        TestLocalRunner.assertTrue((String)"Empty job should work", (boolean)success);
    }

    private Path getNumberDirPath() {
        return new Path(this.getInputPath(), "numberfiles");
    }

    private Path makeNumberFile(int fileNum, int value) throws IOException {
        Path workDir = this.getNumberDirPath();
        Path filePath = new Path(workDir, "file" + fileNum);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
        w.write("" + value);
        w.close();
        return filePath;
    }

    private void verifyNumberJob(int numMaps) throws Exception {
        Path outputDir = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        FileStatus[] stats = ((FileSystem)fs).listStatus(outputDir);
        int valueSum = 0;
        for (FileStatus f : stats) {
            FSDataInputStream istream = fs.open(f.getPath());
            BufferedReader r = new BufferedReader(new InputStreamReader(istream));
            String line = null;
            while ((line = r.readLine()) != null) {
                valueSum += Integer.valueOf(line.trim()).intValue();
            }
            r.close();
        }
        int maxVal = 99;
        int expectedPerMapper = maxVal * (maxVal + 1) / 2;
        int expectedSum = expectedPerMapper * numMaps;
        LOG.info((Object)("expected sum: " + expectedSum + ", got " + valueSum));
        TestLocalRunner.assertEquals((String)"Didn't get all our results back", (int)expectedSum, (int)valueSum);
    }

    private void doMultiReducerTest(int numMaps, int numReduces, int parallelMaps, int parallelReduces) throws Exception {
        Path in = this.getNumberDirPath();
        Path out = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal(conf);
        if (fs.exists(out)) {
            ((FileSystem)fs).delete(out, true);
        }
        if (fs.exists(in)) {
            ((FileSystem)fs).delete(in, true);
        }
        for (int i = 0; i < numMaps; ++i) {
            this.makeNumberFile(i, 100);
        }
        Job job = Job.getInstance();
        job.setNumReduceTasks(numReduces);
        job.setMapperClass(SequenceMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
        LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
        boolean result = job.waitForCompletion(true);
        TestLocalRunner.assertTrue((String)"Job failed!!", (boolean)result);
        this.verifyNumberJob(numMaps);
    }

    @Test
    public void testOneMapMultiReduce() throws Exception {
        this.doMultiReducerTest(1, 2, 1, 1);
    }

    @Test
    public void testOneMapMultiParallelReduce() throws Exception {
        this.doMultiReducerTest(1, 2, 1, 2);
    }

    @Test
    public void testMultiMapOneReduce() throws Exception {
        this.doMultiReducerTest(4, 1, 2, 1);
    }

    @Test
    public void testMultiMapMultiReduce() throws Exception {
        this.doMultiReducerTest(4, 4, 2, 2);
    }

    static {
        for (int i = 0; i < 6; ++i) {
            TOTAL_RECORDS += INPUT_SIZES[i] * OUTPUT_SIZES[i];
        }
    }

    public static class SequenceMapper
    extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        public void map(LongWritable k, Text v, Mapper.Context c) throws IOException, InterruptedException {
            int max = Integer.valueOf(v.toString());
            for (int i = 0; i < max; ++i) {
                c.write(new Text("" + i), NullWritable.get());
            }
        }
    }

    private static class EmptyRecordReader
    extends RecordReader<Object, Object> {
        private EmptyRecordReader() {
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) {
        }

        @Override
        public Object getCurrentKey() {
            return new Object();
        }

        @Override
        public Object getCurrentValue() {
            return new Object();
        }

        @Override
        public float getProgress() {
            return 0.0f;
        }

        @Override
        public void close() {
        }

        @Override
        public boolean nextKeyValue() {
            return false;
        }
    }

    private static class EmptyInputFormat
    extends InputFormat<Object, Object> {
        private EmptyInputFormat() {
        }

        @Override
        public List<InputSplit> getSplits(JobContext context) {
            return new ArrayList<InputSplit>();
        }

        @Override
        public RecordReader<Object, Object> createRecordReader(InputSplit split, TaskAttemptContext context) {
            return new EmptyRecordReader();
        }
    }

    private static class GCMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private GCMapper() {
        }

        @Override
        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            ArrayList<Integer> lst = new ArrayList<Integer>();
            for (int i = 0; i < 20000; ++i) {
                lst.add(new Integer(i));
            }
            int sum = 0;
            Iterator i$ = lst.iterator();
            while (i$.hasNext()) {
                int x = (Integer)i$.next();
                sum += x;
            }
            lst = null;
            System.gc();
            c.write(new LongWritable(sum), val);
        }
    }

    private static class CountingReducer
    extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
        private CountingReducer() {
        }

        @Override
        public void reduce(LongWritable key, Iterable<Text> vals, Reducer.Context context) throws IOException, InterruptedException {
            long out = 0L;
            for (Text val : vals) {
                ++out;
            }
            context.write(key, new LongWritable(out));
        }
    }

    private static class StressMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private int threadId;
        public long exposedState;

        private StressMapper() {
        }

        @Override
        protected void setup(Mapper.Context context) {
            FileSplit split = (FileSplit)context.getInputSplit();
            Path filePath = split.getPath();
            String name = filePath.getName();
            this.threadId = Integer.valueOf(name);
            LOG.info((Object)("Thread " + this.threadId + " : " + context.getInputSplit()));
        }

        @Override
        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            for (int i = 0; i < OUTPUT_SIZES[this.threadId]; ++i) {
                c.write(new LongWritable(0L), val);
                if (i % SLEEP_INTERVALS[this.threadId] != 1) continue;
                Thread.sleep(1L);
            }
        }

        @Override
        protected void cleanup(Mapper.Context context) {
            LOG.debug((Object)("Busy loop counter: " + this.exposedState));
        }
    }
}

