/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.mapreduce;

import com.hazelcast.client.mapreduce.AbstractClientMapReduceJobTest;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MultiMap;
import com.hazelcast.mapreduce.Collator;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Job;
import com.hazelcast.mapreduce.JobCompletableFuture;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyPredicate;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.SlowTest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastParallelClassRunner.class)
@Category(value={SlowTest.class})
@Ignore
public class ClientMultiMapReduceTest
extends AbstractClientMapReduceJobTest {
    private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory();

    @After
    public void tearDown() {
        this.hazelcastFactory.terminateAll();
    }

    @Test(expected=ExecutionException.class)
    public void testExceptionDistribution() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)(i / 2), (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new ExceptionThrowingMapper()).submit();
        try {
            Map result = (Map)future.get();
            Assert.fail();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)(e.getCause() instanceof NullPointerException));
            throw e;
        }
    }

    @Test(timeout=60000L, expected=CancellationException.class)
    public void testInProcessCancellation() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)(i / 2), (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new TimeConsumingMapper()).submit();
        future.cancel(true);
        try {
            Map result = (Map)future.get();
            Assert.fail();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @Test(timeout=120000L)
    public void testMapper() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)i, (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new TestMapper()).submit();
        Map result = (Map)future.get();
        Assert.assertEquals((long)100L, (long)result.size());
        for (List value : result.values()) {
            Assert.assertEquals((long)1L, (long)value.size());
        }
    }

    @Test(timeout=120000L)
    public void testMapperReducer() throws Exception {
        int i;
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i2 = 0; i2 < 100; ++i2) {
            m1.put((Object)i2, (Object)i2);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).reducer((ReducerFactory)new TestReducerFactory()).submit();
        Map result = (Map)future.get();
        int[] expectedResults = new int[4];
        for (i = 0; i < 100; ++i) {
            int index;
            int n = index = i % 4;
            expectedResults[n] = expectedResults[n] + i;
        }
        for (i = 0; i < 4; ++i) {
            Assert.assertEquals((long)expectedResults[i], (long)((Integer)result.get(String.valueOf(i))).intValue());
        }
    }

    @Test(timeout=120000L)
    public void testMapperCollator() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)(i / 2), (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).submit((Collator)new GroupingTestCollator());
        int result = (Integer)future.get();
        int expectedResult = 0;
        for (int i = 0; i < 100; ++i) {
            expectedResult += i;
        }
    }

    @Test(timeout=120000L)
    public void testKeyedMapperCollator() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 10000; ++i) {
            m1.put((Object)i, (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.onKeys((Object[])new Integer[]{50}).mapper((Mapper)new TestMapper()).submit((Collator)new GroupingTestCollator());
        int result = (Integer)future.get();
        Assert.assertEquals((long)50L, (long)result);
    }

    @Test(timeout=120000L)
    public void testKeyPredicateMapperCollator() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 10000; ++i) {
            m1.put((Object)i, (Object)i);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.keyPredicate((KeyPredicate)new TestKeyPredicate()).mapper((Mapper)new TestMapper()).submit((Collator)new GroupingTestCollator());
        int result = (Integer)future.get();
        Assert.assertEquals((long)50L, (long)result);
    }

    @Test(timeout=120000L)
    public void testMapperReducerCollator() throws Exception {
        int i;
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i2 = 0; i2 < 100; ++i2) {
            m1.put((Object)(i2 / 2), (Object)i2);
        }
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).reducer((ReducerFactory)new TestReducerFactory()).submit((Collator)new TestCollator());
        int result = (Integer)future.get();
        int expectedResult = 0;
        for (i = 0; i < 100; ++i) {
            expectedResult += i;
        }
        for (i = 0; i < 4; ++i) {
            Assert.assertEquals((long)expectedResult, (long)result);
        }
    }

    @Test(timeout=120000L)
    public void testAsyncMapper() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)i, (Object)i);
        }
        final HashMap listenerResults = new HashMap();
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new TestMapper()).submit();
        future.andThen((ExecutionCallback)new ExecutionCallback<Map<String, List<Integer>>>(){

            public void onResponse(Map<String, List<Integer>> response) {
                listenerResults.putAll(response);
                semaphore.release();
            }

            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
        semaphore.acquire();
        Assert.assertEquals((long)100L, (long)listenerResults.size());
        for (List value : listenerResults.values()) {
            Assert.assertEquals((long)1L, (long)value.size());
        }
    }

    @Test(timeout=120000L)
    public void testKeyedAsyncMapper() throws Exception {
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i = 0; i < 100; ++i) {
            m1.put((Object)i, (Object)i);
        }
        final HashMap listenerResults = new HashMap();
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.onKeys((Object[])new Integer[]{50}).mapper((Mapper)new TestMapper()).submit();
        future.andThen((ExecutionCallback)new ExecutionCallback<Map<String, List<Integer>>>(){

            public void onResponse(Map<String, List<Integer>> response) {
                listenerResults.putAll(response);
                semaphore.release();
            }

            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
        semaphore.acquire();
        Assert.assertEquals((long)1L, (long)listenerResults.size());
        for (List value : listenerResults.values()) {
            Assert.assertEquals((long)1L, (long)value.size());
        }
    }

    @Test(timeout=120000L)
    public void testAsyncMapperReducer() throws Exception {
        int i;
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i2 = 0; i2 < 100; ++i2) {
            m1.put((Object)i2, (Object)i2);
        }
        final HashMap listenerResults = new HashMap();
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).reducer((ReducerFactory)new TestReducerFactory()).submit();
        future.andThen((ExecutionCallback)new ExecutionCallback<Map<String, Integer>>(){

            public void onResponse(Map<String, Integer> response) {
                listenerResults.putAll(response);
                semaphore.release();
            }

            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
        int[] expectedResults = new int[4];
        for (i = 0; i < 100; ++i) {
            int index;
            int n = index = i % 4;
            expectedResults[n] = expectedResults[n] + i;
        }
        semaphore.acquire();
        for (i = 0; i < 4; ++i) {
            Assert.assertEquals((long)expectedResults[i], (long)((Integer)listenerResults.get(String.valueOf(i))).intValue());
        }
    }

    @Test(timeout=120000L)
    public void testAsyncMapperCollator() throws Exception {
        int i;
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i2 = 0; i2 < 100; ++i2) {
            m1.put((Object)(i2 / 2), (Object)i2);
        }
        final int[] result = new int[1];
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).submit((Collator)new GroupingTestCollator());
        future.andThen((ExecutionCallback)new ExecutionCallback<Integer>(){

            public void onResponse(Integer response) {
                result[0] = response;
                semaphore.release();
            }

            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
        int expectedResult = 0;
        for (i = 0; i < 100; ++i) {
            expectedResult += i;
        }
        semaphore.acquire();
        for (i = 0; i < 4; ++i) {
            Assert.assertEquals((long)expectedResult, (long)result[0]);
        }
    }

    @Test(timeout=120000L)
    public void testAsyncMapperReducerCollator() throws Exception {
        int i;
        Config config = this.buildConfig();
        HazelcastInstance h1 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h2 = this.hazelcastFactory.newHazelcastInstance(config);
        HazelcastInstance h3 = this.hazelcastFactory.newHazelcastInstance(config);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h1);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h2);
        ClientMultiMapReduceTest.assertClusterSizeEventually((int)3, (HazelcastInstance)h3);
        HazelcastInstance client = this.hazelcastFactory.newHazelcastClient();
        MultiMap m1 = client.getMultiMap(ClientMultiMapReduceTest.randomString());
        for (int i2 = 0; i2 < 100; ++i2) {
            m1.put((Object)(i2 / 2), (Object)i2);
        }
        final int[] result = new int[1];
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        JobTracker tracker = client.getJobTracker("default");
        Job job = tracker.newJob(ClientMultiMapReduceTest.integerKvSource((MultiMap<Integer, Integer>)m1));
        JobCompletableFuture future = job.mapper((Mapper)new GroupingTestMapper()).reducer((ReducerFactory)new TestReducerFactory()).submit((Collator)new TestCollator());
        future.andThen((ExecutionCallback)new ExecutionCallback<Integer>(){

            public void onResponse(Integer response) {
                result[0] = response;
                semaphore.release();
            }

            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
        int expectedResult = 0;
        for (i = 0; i < 100; ++i) {
            expectedResult += i;
        }
        semaphore.acquire();
        for (i = 0; i < 4; ++i) {
            Assert.assertEquals((long)expectedResult, (long)result[0]);
        }
    }

    static KeyValueSource<Integer, Integer> integerKvSource(MultiMap<Integer, Integer> m) {
        return KeyValueSource.fromMultiMap(m);
    }

    public static class TestCollator
    implements Collator<Map.Entry<String, Integer>, Integer> {
        public Integer collate(Iterable<Map.Entry<String, Integer>> values) {
            int sum = 0;
            for (Map.Entry<String, Integer> entry : values) {
                sum += entry.getValue().intValue();
            }
            return sum;
        }
    }

    public static class GroupingTestCollator
    implements Collator<Map.Entry<String, List<Integer>>, Integer> {
        public Integer collate(Iterable<Map.Entry<String, List<Integer>>> values) {
            int sum = 0;
            for (Map.Entry<String, List<Integer>> entry : values) {
                for (Integer value : entry.getValue()) {
                    sum += value.intValue();
                }
            }
            return sum;
        }
    }

    public static class TestReducerFactory
    implements ReducerFactory<String, Integer, Integer> {
        public Reducer<Integer, Integer> newReducer(String key) {
            return new TestReducer();
        }
    }

    public static class TestReducer
    extends Reducer<Integer, Integer> {
        private int sum = 0;

        public void reduce(Integer value) {
            this.sum += value.intValue();
        }

        public Integer finalizeReduce() {
            return this.sum;
        }
    }

    public static class GroupingTestMapper
    implements Mapper<Integer, Integer, String, Integer> {
        public void map(Integer key, Integer value, Context<String, Integer> collector) {
            collector.emit((Object)String.valueOf(key % 4), (Object)value);
        }
    }

    public static class TestMapper
    implements Mapper<Integer, Integer, String, Integer> {
        public void map(Integer key, Integer value, Context<String, Integer> collector) {
            collector.emit((Object)String.valueOf(key), (Object)value);
        }
    }

    public static class TestKeyPredicate
    implements KeyPredicate<Integer> {
        public boolean evaluate(Integer key) {
            return key == 50;
        }
    }

    public static class TimeConsumingMapper
    implements Mapper<Integer, Integer, String, Integer> {
        public void map(Integer key, Integer value, Context<String, Integer> collector) {
            try {
                Thread.sleep(1000L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            collector.emit((Object)String.valueOf(key), (Object)value);
        }
    }

    public static class ExceptionThrowingMapper
    implements Mapper<Integer, Integer, String, Integer> {
        public void map(Integer key, Integer value, Context<String, Integer> context) {
            throw new NullPointerException("BUMM!");
        }
    }
}

