/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombiningUnilateralSortMergerITCase {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMergerITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = 1000;
    private static final int VALUE_LENGTH = 118;
    private static final int NUM_PAIRS = 50000;
    public static final int MEMORY_SIZE = 0x10000000;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Tuple2<Integer, String>> serializerFactory1;
    private TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactory2;
    private TypeComparator<Tuple2<Integer, String>> comparator1;
    private TypeComparator<Tuple2<Integer, Integer>> comparator2;

    @Before
    public void beforeTest() {
        this.memoryManager = new MemoryManager(0x10000000L, 1);
        this.ioManager = new IOManagerAsync();
        this.serializerFactory1 = TestData.getIntStringTupleSerializerFactory();
        this.comparator1 = TestData.getIntStringTupleComparator();
        this.serializerFactory2 = TestData.getIntIntTupleSerializerFactory();
        this.comparator2 = TestData.getIntIntTupleComparator();
    }

    @After
    public void afterTest() {
        this.ioManager.shutdown();
        if (!this.ioManager.isProperlyShutDown()) {
            Assert.fail((String)"I/O Manager was not properly shut down.");
        }
        if (this.memoryManager != null) {
            Assert.assertTrue((String)"Memory leak: not all segments have been returned to the memory manager.", (boolean)this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testCombine() throws Exception {
        int noKeys = 100;
        int noKeyCnt = 10000;
        TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner comb = new TestCountCombiner();
        CombiningUnilateralSortMerger merger = new CombiningUnilateralSortMerger((GroupCombineFunction)comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2, 0.25, 64, 0.7f, false);
        Tuple2 rec = new Tuple2();
        rec.setField((Object)1, 1);
        for (int i = 0; i < noKeyCnt; ++i) {
            for (int j = 0; j < noKeys; ++j) {
                rec.setField((Object)j, 0);
                reader.emit(rec);
            }
        }
        reader.close();
        MutableObjectIterator iterator = merger.getIterator();
        Iterator<Integer> result = CombiningUnilateralSortMergerITCase.getReducingIterator((MutableObjectIterator<Tuple2<Integer, Integer>>)iterator, (TypeSerializer<Tuple2<Integer, Integer>>)this.serializerFactory2.getSerializer(), (TypeComparator<Tuple2<Integer, Integer>>)this.comparator2.duplicate());
        while (result.hasNext()) {
            Assert.assertEquals((long)noKeyCnt, (long)result.next().intValue());
        }
        merger.close();
        Assert.assertTrue((comb.opened == comb.closed ? 1 : 0) != 0);
    }

    @Test
    public void testCombineSpilling() throws Exception {
        int noKeys = 100;
        int noKeyCnt = 10000;
        TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner comb = new TestCountCombiner();
        CombiningUnilateralSortMerger merger = new CombiningUnilateralSortMerger((GroupCombineFunction)comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2, 0.01, 64, 0.005f, true);
        Tuple2 rec = new Tuple2();
        rec.setField((Object)1, 1);
        for (int i = 0; i < noKeyCnt; ++i) {
            for (int j = 0; j < noKeys; ++j) {
                rec.setField((Object)j, 0);
                reader.emit(rec);
            }
        }
        reader.close();
        MutableObjectIterator iterator = merger.getIterator();
        Iterator<Integer> result = CombiningUnilateralSortMergerITCase.getReducingIterator((MutableObjectIterator<Tuple2<Integer, Integer>>)iterator, (TypeSerializer<Tuple2<Integer, Integer>>)this.serializerFactory2.getSerializer(), (TypeComparator<Tuple2<Integer, Integer>>)this.comparator2.duplicate());
        while (result.hasNext()) {
            Assert.assertEquals((long)noKeyCnt, (long)result.next().intValue());
        }
        merger.close();
        Assert.assertTrue((comb.opened == comb.closed ? 1 : 0) != 0);
    }

    @Test
    public void testSortAndValidate() throws Exception {
        Hashtable<Object, Integer> countTable = new Hashtable<Object, Integer>(1000);
        for (int i = 1; i <= 1000; ++i) {
            countTable.put(i, 0);
        }
        IntComparator keyComparator = new IntComparator(true);
        TestData.MockTuple2Reader<Tuple2<Integer, String>> reader = TestData.getIntStringTupleReader();
        LOG.debug("initializing sortmerger");
        TestCountCombiner2 comb = new TestCountCombiner2();
        CombiningUnilateralSortMerger merger = new CombiningUnilateralSortMerger((GroupCombineFunction)comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1, 0.25, 2, 0.7f, false);
        LOG.debug("emitting data");
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, 1000, 118, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
        Tuple2<Integer, String> rec = new Tuple2<Integer, String>();
        for (int i = 0; i < 50000; ++i) {
            Assert.assertTrue(((rec = generator.next(rec)) != null ? 1 : 0) != 0);
            Integer key = (Integer)rec.f0;
            rec.setField((Object)"1", 1);
            reader.emit(rec);
            countTable.put(key, (Integer)countTable.get(key) + 1);
        }
        reader.close();
        MutableObjectIterator iterator = merger.getIterator();
        LOG.debug("checking results");
        Tuple2 rec1 = new Tuple2();
        Tuple2 rec2 = new Tuple2();
        rec1 = (Tuple2)iterator.next((Object)rec1);
        Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
        countTable.put(rec1.f0, (Integer)countTable.get(rec1.f0) - Integer.parseInt((String)rec1.f1));
        while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
            int k1 = (Integer)rec1.f0;
            int k2 = (Integer)rec2.f0;
            Assert.assertTrue((keyComparator.compare((Object)k1, (Object)k2) <= 0 ? 1 : 0) != 0);
            countTable.put(k2, (Integer)countTable.get(k2) - Integer.parseInt((String)rec2.f1));
            rec1 = rec2;
        }
        for (Integer cnt : countTable.values()) {
            Assert.assertTrue((cnt == 0 ? 1 : 0) != 0);
        }
        merger.close();
        Assert.assertTrue((comb.opened == comb.closed ? 1 : 0) != 0);
    }

    private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>> comparator) {
        final ReusingKeyGroupedIterator groupIter = new ReusingKeyGroupedIterator(data, serializer, comparator);
        return new Iterator<Integer>(){
            private boolean hasNext = false;

            @Override
            public boolean hasNext() {
                if (this.hasNext) {
                    return true;
                }
                try {
                    this.hasNext = groupIter.nextKey();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                return this.hasNext;
            }

            @Override
            public Integer next() {
                if (this.hasNext()) {
                    this.hasNext = false;
                    ReusingKeyGroupedIterator.ValuesIterator values = groupIter.getValues();
                    int cnt = 0;
                    while (values.hasNext()) {
                        Tuple2 rec = (Tuple2)values.next();
                        cnt += ((Integer)rec.f1).intValue();
                    }
                    return cnt;
                }
                throw new NoSuchElementException();
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public static class TestCountCombiner2
    extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>
    implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
            Tuple2<Integer, String> rec = new Tuple2<Integer, String>();
            int cnt = 0;
            Iterator<Tuple2<Integer, String>> iterator = values.iterator();
            while (iterator.hasNext()) {
                Tuple2<Integer, String> next;
                rec = next = iterator.next();
                cnt += Integer.parseInt((String)rec.f1);
            }
            out.collect((Object)new Tuple2(rec.f0, (Object)(cnt + "")));
        }

        public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
        }

        public void open(Configuration parameters) throws Exception {
            this.opened = true;
        }

        public void close() throws Exception {
            this.closed = true;
        }
    }

    public static class TestCountCombiner
    extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
    implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private Integer count = 0;
        public volatile boolean opened = false;
        public volatile boolean closed = false;

        public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
            Tuple2<Integer, Integer> rec = new Tuple2<Integer, Integer>();
            int cnt = 0;
            Iterator<Tuple2<Integer, Integer>> iterator = values.iterator();
            while (iterator.hasNext()) {
                Tuple2<Integer, Integer> next;
                rec = next = iterator.next();
                cnt += ((Integer)rec.f1).intValue();
            }
            this.count = cnt;
            rec.setField((Object)this.count, 1);
            out.collect(rec);
        }

        public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
        }

        public void open(Configuration parameters) throws Exception {
            this.opened = true;
        }

        public void close() throws Exception {
            this.closed = true;
        }
    }
}

