/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.join;

import java.io.IOException;
import junit.extensions.TestSetup;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;
import org.apache.hadoop.mapreduce.lib.join.TupleWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class TestJoinDatamerge
extends TestCase {
    private static MiniDFSCluster cluster = null;

    public static Test suite() {
        TestSetup setup = new TestSetup((Test)new TestSuite(TestJoinDatamerge.class)){

            protected void setUp() throws Exception {
                Configuration conf = new Configuration();
                cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
            }

            protected void tearDown() throws Exception {
                if (cluster != null) {
                    cluster.shutdown();
                }
            }
        };
        return setup;
    }

    private static SequenceFile.Writer[] createWriters(Path testdir, Configuration conf, int srcs, Path[] src) throws IOException {
        for (int i = 0; i < srcs; ++i) {
            src[i] = new Path(testdir, Integer.toString(i + 10, 36));
        }
        SequenceFile.Writer[] out = new SequenceFile.Writer[srcs];
        for (int i = 0; i < srcs; ++i) {
            out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf, src[i], IntWritable.class, IntWritable.class);
        }
        return out;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Path[] writeSimpleSrc(Path testdir, Configuration conf, int srcs) throws IOException {
        SequenceFile.Writer[] out = null;
        Path[] src = new Path[srcs];
        try {
            out = TestJoinDatamerge.createWriters(testdir, conf, srcs, src);
            int capacity = srcs * 2 + 1;
            IntWritable key = new IntWritable();
            IntWritable val = new IntWritable();
            for (int k = 0; k < capacity; ++k) {
                for (int i = 0; i < srcs; ++i) {
                    key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
                    val.set(10 * k + i);
                    out[i].append(key, val);
                    if (i != k) continue;
                    out[i].append(key, val);
                }
            }
        }
        finally {
            if (out != null) {
                for (int i = 0; i < srcs; ++i) {
                    if (out[i] == null) continue;
                    out[i].close();
                }
            }
        }
        return src;
    }

    private static String stringify(IntWritable key, Writable val) {
        StringBuilder sb = new StringBuilder();
        sb.append("(" + key);
        sb.append("," + val + ")");
        return sb.toString();
    }

    private static void joinAs(String jointype, Class<? extends SimpleCheckerMapBase<?>> map, Class<? extends SimpleCheckerReduceBase> reduce) throws Exception {
        int srcs = 4;
        Configuration conf = new Configuration();
        Path base = cluster.getFileSystem().makeQualified(new Path("/" + jointype));
        Path[] src = TestJoinDatamerge.writeSimpleSrc(base, conf, 4);
        conf.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype, SequenceFileInputFormat.class, src));
        conf.setInt("testdatamerge.sources", 4);
        Job job = Job.getInstance(conf);
        job.setInputFormatClass(CompositeInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(base, "out"));
        job.setMapperClass(map);
        job.setReducerClass(reduce);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);
        TestJoinDatamerge.assertTrue((String)"Job failed", (boolean)job.isSuccessful());
        if ("outer".equals(jointype)) {
            TestJoinDatamerge.checkOuterConsistency(job, src);
        }
        base.getFileSystem(conf).delete(base, true);
    }

    public void testSimpleInnerJoin() throws Exception {
        TestJoinDatamerge.joinAs("inner", InnerJoinMapChecker.class, InnerJoinReduceChecker.class);
    }

    public void testSimpleOuterJoin() throws Exception {
        TestJoinDatamerge.joinAs("outer", OuterJoinMapChecker.class, OuterJoinReduceChecker.class);
    }

    private static void checkOuterConsistency(Job job, Path[] src) throws IOException {
        Path outf = FileOutputFormat.getOutputPath(job);
        FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter());
        TestJoinDatamerge.assertEquals((String)("number of part files is more than 1. It is" + outlist.length), (int)1, (int)outlist.length);
        TestJoinDatamerge.assertTrue((String)("output file with zero length" + outlist[0].getLen()), (0L < outlist[0].getLen() ? 1 : 0) != 0);
        SequenceFile.Reader r = new SequenceFile.Reader(cluster.getFileSystem(), outlist[0].getPath(), job.getConfiguration());
        IntWritable k = new IntWritable();
        IntWritable v = new IntWritable();
        while (r.next(k, v)) {
            TestJoinDatamerge.assertEquals((String)"counts does not match", (int)v.get(), (int)TestJoinDatamerge.countProduct(k, src, job.getConfiguration()));
        }
        r.close();
    }

    private static int countProduct(IntWritable key, Path[] src, Configuration conf) throws IOException {
        int product = 1;
        for (Path p : src) {
            int count = 0;
            SequenceFile.Reader r = new SequenceFile.Reader(cluster.getFileSystem(), p, conf);
            IntWritable k = new IntWritable();
            IntWritable v = new IntWritable();
            while (r.next(k, v)) {
                if (!k.equals(key)) continue;
                ++count;
            }
            r.close();
            if (count == 0) continue;
            product *= count;
        }
        return product;
    }

    public void testSimpleOverride() throws Exception {
        TestJoinDatamerge.joinAs("override", OverrideMapChecker.class, OverrideReduceChecker.class);
    }

    public void testNestedJoin() throws Exception {
        int i;
        int SOURCES = 3;
        int ITEMS = 16;
        Configuration conf = new Configuration();
        Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
        int[][] source = new int[3][];
        for (int i2 = 0; i2 < 3; ++i2) {
            source[i2] = new int[16];
            for (int j = 0; j < 16; ++j) {
                source[i2][j] = (i2 + 2) * (j + 1);
            }
        }
        Path[] src = new Path[3];
        SequenceFile.Writer[] out = TestJoinDatamerge.createWriters(base, conf, 3, src);
        IntWritable k = new IntWritable();
        for (int i3 = 0; i3 < 3; ++i3) {
            IntWritable v = new IntWritable();
            v.set(i3);
            for (int j = 0; j < 16; ++j) {
                k.set(source[i3][j]);
                out[i3].append(k, v);
            }
            out[i3].close();
        }
        out = null;
        StringBuilder sb = new StringBuilder();
        sb.append("outer(inner(");
        for (i = 0; i < 3; ++i) {
            sb.append(CompositeInputFormat.compose(SequenceFileInputFormat.class, src[i].toString()));
            if (i + 1 == 3) continue;
            sb.append(",");
        }
        sb.append("),outer(");
        sb.append(CompositeInputFormat.compose(MapReduceTestUtil.Fake_IF.class, "foobar"));
        sb.append(",");
        for (i = 0; i < 3; ++i) {
            sb.append(CompositeInputFormat.compose(SequenceFileInputFormat.class, src[i].toString()));
            sb.append(",");
        }
        sb.append(CompositeInputFormat.compose(MapReduceTestUtil.Fake_IF.class, "raboof") + "))");
        conf.set("mapreduce.join.expr", sb.toString());
        MapReduceTestUtil.Fake_IF.setKeyClass(conf, IntWritable.class);
        MapReduceTestUtil.Fake_IF.setValClass(conf, IntWritable.class);
        Job job = Job.getInstance(conf);
        Path outf = new Path(base, "out");
        FileOutputFormat.setOutputPath(job, outf);
        job.setInputFormatClass(CompositeInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(TupleWritable.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.waitForCompletion(true);
        TestJoinDatamerge.assertTrue((String)"Job failed", (boolean)job.isSuccessful());
        FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter());
        TestJoinDatamerge.assertEquals((int)1, (int)outlist.length);
        TestJoinDatamerge.assertTrue((0L < outlist[0].getLen() ? 1 : 0) != 0);
        SequenceFile.Reader r = new SequenceFile.Reader(cluster.getFileSystem(), outlist[0].getPath(), conf);
        TupleWritable v = new TupleWritable();
        while (r.next(k, v)) {
            int i4;
            TestJoinDatamerge.assertFalse((boolean)((TupleWritable)v.get(1)).has(0));
            TestJoinDatamerge.assertFalse((boolean)((TupleWritable)v.get(1)).has(4));
            boolean chk = true;
            int ki = k.get();
            for (i4 = 2; i4 < 5; ++i4) {
                if (ki % i4 == 0 && ki <= i4 * 16) {
                    TestJoinDatamerge.assertEquals((int)(i4 - 2), (int)((IntWritable)((TupleWritable)v.get(1)).get(i4 - 1)).get());
                    continue;
                }
                chk = false;
            }
            if (chk) {
                TestJoinDatamerge.assertTrue((boolean)v.has(0));
                for (i4 = 0; i4 < 3; ++i4) {
                    TestJoinDatamerge.assertTrue((boolean)((TupleWritable)v.get(0)).has(i4));
                }
                continue;
            }
            TestJoinDatamerge.assertFalse((boolean)v.has(0));
        }
        r.close();
        base.getFileSystem(conf).delete(base, true);
    }

    public void testEmptyJoin() throws Exception {
        Configuration conf = new Configuration();
        Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
        Path[] src = new Path[]{new Path(base, "i0"), new Path("i1"), new Path("i2")};
        conf.set("mapreduce.join.expr", CompositeInputFormat.compose("outer", MapReduceTestUtil.Fake_IF.class, src));
        MapReduceTestUtil.Fake_IF.setKeyClass(conf, MapReduceTestUtil.IncomparableKey.class);
        Job job = Job.getInstance(conf);
        job.setInputFormatClass(CompositeInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(base, "out"));
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(MapReduceTestUtil.IncomparableKey.class);
        job.setOutputValueClass(NullWritable.class);
        job.waitForCompletion(true);
        TestJoinDatamerge.assertTrue((boolean)job.isSuccessful());
        base.getFileSystem(conf).delete(base, true);
    }

    private static class OverrideReduceChecker
    extends SimpleCheckerReduceBase {
        private OverrideReduceChecker() {
        }

        @Override
        public boolean verify(int key, int occ) {
            if (key < this.srcs * this.srcs && key % (this.srcs + 1) == 0 && key != 0) {
                return 2 == occ;
            }
            return 1 == occ;
        }
    }

    private static class OverrideMapChecker
    extends SimpleCheckerMapBase<IntWritable> {
        private OverrideMapChecker() {
        }

        @Override
        public void map(IntWritable key, IntWritable val, Mapper.Context context) throws IOException, InterruptedException {
            int k = key.get();
            int vali = val.get();
            String kvstr = "Unexpected tuple: " + TestJoinDatamerge.stringify(key, val);
            if (0 == k % (this.srcs * this.srcs)) {
                Assert.assertTrue((String)kvstr, (vali == k * 10 / this.srcs + this.srcs - 1 ? 1 : 0) != 0);
            } else {
                int i = k % this.srcs;
                Assert.assertTrue((String)kvstr, (this.srcs * (vali - i) == 10 * (k - i) ? 1 : 0) != 0);
            }
            context.write(key, one);
            key.set(-1);
            val.set(0);
        }
    }

    private static class OuterJoinReduceChecker
    extends SimpleCheckerReduceBase {
        private OuterJoinReduceChecker() {
        }

        @Override
        public boolean verify(int key, int occ) {
            if (key < this.srcs * this.srcs && key % (this.srcs + 1) == 0) {
                return 2 == occ;
            }
            return 1 == occ;
        }
    }

    private static class OuterJoinMapChecker
    extends SimpleCheckerMapBase<TupleWritable> {
        private OuterJoinMapChecker() {
        }

        @Override
        public void map(IntWritable key, TupleWritable val, Mapper.Context context) throws IOException, InterruptedException {
            int k = key.get();
            String kvstr = "Unexpected tuple: " + TestJoinDatamerge.stringify(key, val);
            if (0 == k % (this.srcs * this.srcs)) {
                for (int i = 0; i < val.size(); ++i) {
                    Assert.assertTrue((String)kvstr, (boolean)(val.get(i) instanceof IntWritable));
                    int vali = ((IntWritable)val.get(i)).get();
                    Assert.assertTrue((String)kvstr, ((vali - i) * this.srcs == 10 * k ? 1 : 0) != 0);
                }
            } else {
                for (int i = 0; i < val.size(); ++i) {
                    if (i == k % this.srcs) {
                        Assert.assertTrue((String)kvstr, (boolean)(val.get(i) instanceof IntWritable));
                        int vali = ((IntWritable)val.get(i)).get();
                        Assert.assertTrue((String)kvstr, (this.srcs * (vali - i) == 10 * (k - i) ? 1 : 0) != 0);
                        continue;
                    }
                    Assert.assertTrue((String)kvstr, (!val.has(i) ? 1 : 0) != 0);
                }
            }
            context.write(key, one);
            key.set(-1);
            if (val.has(0)) {
                ((IntWritable)val.get(0)).set(0);
            }
        }
    }

    private static class InnerJoinReduceChecker
    extends SimpleCheckerReduceBase {
        private InnerJoinReduceChecker() {
        }

        @Override
        public boolean verify(int key, int occ) {
            return key == 0 && occ == 2 || key != 0 && key % (this.srcs * this.srcs) == 0 && occ == 1;
        }
    }

    private static class InnerJoinMapChecker
    extends SimpleCheckerMapBase<TupleWritable> {
        private InnerJoinMapChecker() {
        }

        @Override
        public void map(IntWritable key, TupleWritable val, Mapper.Context context) throws IOException, InterruptedException {
            int k = key.get();
            String kvstr = "Unexpected tuple: " + TestJoinDatamerge.stringify(key, val);
            Assert.assertTrue((String)kvstr, (0 == k % (this.srcs * this.srcs) ? 1 : 0) != 0);
            for (int i = 0; i < val.size(); ++i) {
                int vali = ((IntWritable)val.get(i)).get();
                Assert.assertTrue((String)kvstr, ((vali - i) * this.srcs == 10 * k ? 1 : 0) != 0);
            }
            context.write(key, one);
            key.set(-1);
            if (val.has(0)) {
                ((IntWritable)val.get(0)).set(0);
            }
        }
    }

    private static abstract class SimpleCheckerReduceBase
    extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        protected static final IntWritable one = new IntWritable(1);
        int srcs;

        private SimpleCheckerReduceBase() {
        }

        @Override
        public void setup(Reducer.Context context) {
            this.srcs = context.getConfiguration().getInt("testdatamerge.sources", 0);
            Assert.assertTrue((String)("Invalid src count: " + this.srcs), (this.srcs > 0 ? 1 : 0) != 0);
        }

        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int seen = 0;
            for (IntWritable value : values) {
                seen += value.get();
            }
            Assert.assertTrue((String)("Bad count for " + key.get()), (boolean)this.verify(key.get(), seen));
            context.write(key, new IntWritable(seen));
        }

        public abstract boolean verify(int var1, int var2);
    }

    private static abstract class SimpleCheckerMapBase<V extends Writable>
    extends Mapper<IntWritable, V, IntWritable, IntWritable> {
        protected static final IntWritable one = new IntWritable(1);
        int srcs;

        private SimpleCheckerMapBase() {
        }

        @Override
        public void setup(Mapper.Context context) {
            this.srcs = context.getConfiguration().getInt("testdatamerge.sources", 0);
            Assert.assertTrue((String)("Invalid src count: " + this.srcs), (this.srcs > 0 ? 1 : 0) != 0);
        }
    }
}

