/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.input;

import java.io.BufferedReader;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Before;
import org.junit.Test;

public class TestMultipleInputs
extends HadoopTestCase {
    private static final Path ROOT_DIR = new Path("testing/mo");
    private static final Path IN1_DIR = new Path(ROOT_DIR, "input1");
    private static final Path IN2_DIR = new Path(ROOT_DIR, "input2");
    private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
    static final Text blah = new Text("blah");

    public TestMultipleInputs() throws IOException {
        super(1, 4, 1, 1);
    }

    private Path getDir(Path dir) {
        if (this.isLocalFS()) {
            String localPathRoot = System.getProperty("test.build.data", "/tmp").replace(' ', '+');
            dir = new Path(localPathRoot, dir);
        }
        return dir;
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        Path rootDir = this.getDir(ROOT_DIR);
        Path in1Dir = this.getDir(IN1_DIR);
        Path in2Dir = this.getDir(IN2_DIR);
        JobConf conf = this.createJobConf();
        FileSystem fs = FileSystem.get(conf);
        fs.delete(rootDir, true);
        if (!fs.mkdirs(in1Dir)) {
            throw new IOException("Mkdirs failed to create " + in1Dir.toString());
        }
        if (!fs.mkdirs(in2Dir)) {
            throw new IOException("Mkdirs failed to create " + in2Dir.toString());
        }
    }

    @Test
    public void testDoMultipleInputs() throws IOException {
        Path in1Dir = this.getDir(IN1_DIR);
        Path in2Dir = this.getDir(IN2_DIR);
        Path outDir = this.getDir(OUT_DIR);
        JobConf conf = this.createJobConf();
        FileSystem fs = FileSystem.get(conf);
        fs.delete(outDir, true);
        FSDataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
        file1.writeBytes("a\nb\nc\nd\ne");
        ((FilterOutputStream)file1).close();
        FSDataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
        file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
        ((FilterOutputStream)file2).close();
        Job job = Job.getInstance(conf);
        job.setJobName("mi");
        MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class, MapClass.class);
        MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class, KeyValueMapClass.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setReducerClass(ReducerClass.class);
        FileOutputFormat.setOutputPath(job, outDir);
        boolean success = false;
        try {
            success = job.waitForCompletion(true);
        }
        catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
        catch (ClassNotFoundException instante) {
            throw new RuntimeException(instante);
        }
        if (!success) {
            throw new RuntimeException("Job failed!");
        }
        BufferedReader output = new BufferedReader(new InputStreamReader(fs.open(new Path(outDir, "part-r-00000"))));
        TestMultipleInputs.assertTrue((boolean)output.readLine().equals("a 2"));
        TestMultipleInputs.assertTrue((boolean)output.readLine().equals("b 2"));
        TestMultipleInputs.assertTrue((boolean)output.readLine().equals("c 2"));
        TestMultipleInputs.assertTrue((boolean)output.readLine().equals("d 2"));
        TestMultipleInputs.assertTrue((boolean)output.readLine().equals("e 2"));
    }

    public void testAddInputPathWithFormat() throws IOException {
        Job conf = Job.getInstance();
        MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
        MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class);
        Map<Path, InputFormat> inputs = MultipleInputs.getInputFormatMap(conf);
        TestMultipleInputs.assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
        TestMultipleInputs.assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")).getClass());
    }

    public void testAddInputPathWithMapper() throws IOException {
        Job conf = Job.getInstance();
        MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class);
        MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, KeyValueMapClass.class);
        Map<Path, InputFormat> inputs = MultipleInputs.getInputFormatMap(conf);
        Map<Path, Class<? extends Mapper>> maps = MultipleInputs.getMapperTypeMap(conf);
        TestMultipleInputs.assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
        TestMultipleInputs.assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")).getClass());
        TestMultipleInputs.assertEquals(MapClass.class, maps.get(new Path("/foo")));
        TestMultipleInputs.assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
    }

    static class ReducerClass
    extends Reducer<Text, Text, NullWritable, Text> {
        int count = 0;

        ReducerClass() {
        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer.Context ctx) throws IOException, InterruptedException {
            this.count = 0;
            for (Text value : values) {
                ++this.count;
            }
            ctx.write(NullWritable.get(), new Text(key.toString() + " " + this.count));
        }
    }

    static class KeyValueMapClass
    extends Mapper<Text, Text, Text, Text> {
        KeyValueMapClass() {
        }

        @Override
        public void map(Text key, Text value, Mapper.Context ctx) throws IOException, InterruptedException {
            ctx.write(key, blah);
        }
    }

    static class MapClass
    extends Mapper<LongWritable, Text, Text, Text> {
        MapClass() {
        }

        @Override
        public void map(LongWritable key, Text value, Mapper.Context ctx) throws IOException, InterruptedException {
            ctx.write(value, blah);
        }
    }
}

