/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase;
import org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ReusingReOpenableHashTableITCase {
    private static final int PAGE_SIZE = 8192;
    private static final long MEMORY_SIZE = 0x7D0000L;
    private static final long SEED1 = 561349061987311L;
    private static final long SEED2 = 231434613412342L;
    private static final int NUM_PROBES = 3;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
    private TypeComparator<Tuple2<Integer, String>> record1Comparator;
    private TypeComparator<Tuple2<Integer, String>> record2Comparator;
    private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
    private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
    private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
    private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
    private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
    private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
    private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;

    @Before
    public void beforeTest() {
        this.recordSerializer = TestData.getIntStringTupleSerializer();
        this.record1Comparator = TestData.getIntStringTupleComparator();
        this.record2Comparator = TestData.getIntStringTupleComparator();
        this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
        this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
        this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
        this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
        this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
        this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
        this.memoryManager = new MemoryManager(0x7D0000L, 1, 8192, MemoryType.HEAP, true);
        this.ioManager = new IOManagerAsync();
    }

    @After
    public void afterTest() {
        if (this.ioManager != null) {
            this.ioManager.shutdown();
            if (!this.ioManager.isProperlyShutDown()) {
                Assert.fail((String)"I/O manager failed to properly shut down.");
            }
            this.ioManager = null;
        }
        if (this.memoryManager != null) {
            Assert.assertTrue((String)"Memory Leak: Not all memory has been returned to the memory manager.", (boolean)this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testOverflow() {
        int buildSize = 1000;
        int probeSize = 1000;
        try {
            TestData.TupleGenerator bgen = new TestData.TupleGenerator(561349061987311L, 200, 1024, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator pgen = new TestData.TupleGenerator(231434613412342L, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
            TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
            this.doTest(buildInput, probeInput, bgen, pgen);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    @Test
    public void testDoubleProbeSpilling() {
        int buildSize = 1000;
        int probeSize = 1000;
        try {
            TestData.TupleGenerator bgen = new TestData.TupleGenerator(561349061987311L, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator pgen = new TestData.TupleGenerator(231434613412342L, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
            TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
            this.doTest(buildInput, probeInput, bgen, pgen);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    @Test
    public void testDoubleProbeInMemory() {
        int buildSize = 1000;
        int probeSize = 1000;
        try {
            TestData.TupleGenerator bgen = new TestData.TupleGenerator(561349061987311L, 0, 28, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator pgen = new TestData.TupleGenerator(231434613412342L, 0, 28, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
            TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
            this.doTest(buildInput, probeInput, bgen, pgen);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
        Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> expectedFirstMatchesMap = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput), NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
        ArrayList<Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>>> expectedNMatchesMapList = new ArrayList<Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>>>(3);
        NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin[] nMatcher = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin[3];
        for (int i = 0; i < 3; ++i) {
            Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> tmp = ReusingReOpenableHashTableITCase.deepCopy(expectedFirstMatchesMap);
            expectedNMatchesMapList.add(tmp);
            nMatcher[i] = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin(tmp);
        }
        NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin firstMatcher = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin(expectedFirstMatchesMap);
        DiscardingOutputCollector collector = new DiscardingOutputCollector();
        bgen.reset();
        pgen.reset();
        buildInput.reset();
        probeInput.reset();
        ReusingBuildFirstReOpenableHashJoinIterator iterator = new ReusingBuildFirstReOpenableHashJoinIterator((MutableObjectIterator)buildInput, (MutableObjectIterator)probeInput, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
        iterator.open();
        while (iterator.callWithNextKey((FlatJoinFunction)firstMatcher, collector)) {
        }
        for (Map.Entry<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
            if (entry.getValue().isEmpty()) continue;
            Assert.fail((String)("Collection for key " + entry.getKey() + " is not empty"));
        }
        for (int i = 0; i < 3; ++i) {
            pgen.reset();
            probeInput.reset();
            iterator.reopenProbe((MutableObjectIterator)probeInput);
            while (iterator.callWithNextKey((FlatJoinFunction)nMatcher[i], collector)) {
            }
            for (Map.Entry entry : ((Map)expectedNMatchesMapList.get(i)).entrySet()) {
                if (((Collection)entry.getValue()).isEmpty()) continue;
                Assert.fail((String)("Collection for key " + entry.getKey() + " is not empty"));
            }
        }
        iterator.close();
    }

    private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(int numKeys, int probeValsPerKey, int repeatedValue1, int repeatedValue2) {
        UniformIntTupleGenerator probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
        TestData.ConstantIntIntTuplesIterator probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
        TestData.ConstantIntIntTuplesIterator probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
        ArrayList<MutableObjectIterator<Object>> probes = new ArrayList<MutableObjectIterator<Object>>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        return new UnionIterator<Tuple2<Integer, Integer>>(probes);
    }

    @Test
    public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
        List memSegments;
        int REPEATED_VALUE_1 = 40559;
        int REPEATED_VALUE_2 = 92882;
        int REPEATED_VALUE_COUNT_BUILD = 200000;
        int REPEATED_VALUE_COUNT_PROBE = 5;
        int NUM_KEYS = 1000000;
        int BUILD_VALS_PER_KEY = 3;
        int PROBE_VALS_PER_KEY = 10;
        UniformIntTupleGenerator build1 = new UniformIntTupleGenerator(1000000, 3, false);
        TestData.ConstantIntIntTuplesIterator build2 = new TestData.ConstantIntIntTuplesIterator(40559, 17, 200000);
        TestData.ConstantIntIntTuplesIterator build3 = new TestData.ConstantIntIntTuplesIterator(92882, 23, 200000);
        ArrayList<MutableObjectIterator<Object>> builds = new ArrayList<MutableObjectIterator<Object>>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        try {
            memSegments = this.memoryManager.allocatePages((Object)MEM_OWNER, 896);
        }
        catch (MemoryAllocationException maex) {
            Assert.fail((String)"Memory for the Join could not be provided.");
            return;
        }
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        ReOpenableMutableHashTable join = new ReOpenableMutableHashTable(this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, memSegments, this.ioManager, true);
        for (int probe = 0; probe < 3; ++probe) {
            MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = this.getProbeInput(1000000, 10, 40559, 92882);
            if (probe == 0) {
                join.open(buildInput, probeInput);
            } else {
                join.reopenProbe(probeInput);
            }
            Tuple2 recordReuse = new Tuple2();
            while (join.nextRecord()) {
                long numBuildValues = 0L;
                Tuple2 probeRec = (Tuple2)join.getCurrentProbeRecord();
                Integer key = (Integer)probeRec.f0;
                MutableObjectIterator buildSide = join.getBuildSideIterator();
                Tuple2 record = (Tuple2)buildSide.next((Object)recordReuse);
                if (record != null) {
                    numBuildValues = 1L;
                    Assert.assertEquals((String)"Probe-side key was different than build-side key.", (Object)key, (Object)record.f0);
                } else {
                    Assert.fail((String)"No build side values found for a probe key.");
                }
                while ((record = (Tuple2)buildSide.next((Object)record)) != null) {
                    ++numBuildValues;
                    Assert.assertEquals((String)"Probe-side key was different than build-side key.", (Object)key, (Object)record.f0);
                }
                Long contained = (Long)map.get(key);
                contained = contained == null ? Long.valueOf(numBuildValues) : Long.valueOf(contained + numBuildValues);
                map.put(key, contained);
            }
        }
        join.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry entry : map.entrySet()) {
            long val = (Long)entry.getValue();
            int key = (Integer)entry.getKey();
            if (key == 40559 || key == 92882) {
                Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)9000135L, (long)val);
                continue;
            }
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)90L, (long)val);
        }
        this.memoryManager.release((Collection)join.getFreedMemory());
    }

    @Test
    public void testSpillingHashJoinWithTwoRecursions() throws IOException {
        List memSegments;
        int REPEATED_VALUE_1 = 40559;
        int REPEATED_VALUE_2 = 92882;
        int REPEATED_VALUE_COUNT_BUILD = 200000;
        int REPEATED_VALUE_COUNT_PROBE = 5;
        int NUM_KEYS = 1000000;
        int BUILD_VALS_PER_KEY = 3;
        int PROBE_VALS_PER_KEY = 10;
        UniformIntTupleGenerator build1 = new UniformIntTupleGenerator(1000000, 3, false);
        TestData.ConstantIntIntTuplesIterator build2 = new TestData.ConstantIntIntTuplesIterator(40559, 17, 200000);
        TestData.ConstantIntIntTuplesIterator build3 = new TestData.ConstantIntIntTuplesIterator(92882, 23, 200000);
        ArrayList<MutableObjectIterator<Object>> builds = new ArrayList<MutableObjectIterator<Object>>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        try {
            memSegments = this.memoryManager.allocatePages((Object)MEM_OWNER, 896);
        }
        catch (MemoryAllocationException maex) {
            Assert.fail((String)"Memory for the Join could not be provided.");
            return;
        }
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        ReOpenableMutableHashTable join = new ReOpenableMutableHashTable(this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, memSegments, this.ioManager, true);
        for (int probe = 0; probe < 3; ++probe) {
            MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = this.getProbeInput(1000000, 10, 40559, 92882);
            if (probe == 0) {
                join.open(buildInput, probeInput);
            } else {
                join.reopenProbe(probeInput);
            }
            Tuple2 recordReuse = new Tuple2();
            while (join.nextRecord()) {
                long numBuildValues = 0L;
                Tuple2 probeRec = (Tuple2)join.getCurrentProbeRecord();
                Integer key = (Integer)probeRec.f0;
                MutableObjectIterator buildSide = join.getBuildSideIterator();
                Tuple2 record = (Tuple2)buildSide.next((Object)recordReuse);
                if (record != null) {
                    numBuildValues = 1L;
                    Assert.assertEquals((String)"Probe-side key was different than build-side key.", (Object)key, (Object)record.f0);
                } else {
                    Assert.fail((String)"No build side values found for a probe key.");
                }
                while ((record = (Tuple2)buildSide.next((Object)recordReuse)) != null) {
                    ++numBuildValues;
                    Assert.assertEquals((String)"Probe-side key was different than build-side key.", (Object)key, (Object)record.f0);
                }
                Long contained = (Long)map.get(key);
                contained = contained == null ? Long.valueOf(numBuildValues) : Long.valueOf(contained + numBuildValues);
                map.put(key, contained);
            }
        }
        join.close();
        Assert.assertEquals((String)"Wrong number of keys", (long)1000000L, (long)map.size());
        for (Map.Entry entry : map.entrySet()) {
            long val = (Long)entry.getValue();
            int key = (Integer)entry.getKey();
            if (key == 40559 || key == 92882) {
                Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)9000135L, (long)val);
                continue;
            }
            Assert.assertEquals((String)("Wrong number of values in per-key cross product for key " + key), (long)90L, (long)val);
        }
        this.memoryManager.release((Collection)join.getFreedMemory());
    }

    static Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> deepCopy(Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> expectedSecondMatchesMap) {
        HashMap<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> copy = new HashMap<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>>(expectedSecondMatchesMap.size());
        for (Map.Entry<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
            ArrayList<NonReusingHashJoinIteratorITCase.TupleMatch> matches = new ArrayList<NonReusingHashJoinIteratorITCase.TupleMatch>(entry.getValue().size());
            for (NonReusingHashJoinIteratorITCase.TupleMatch m : entry.getValue()) {
                matches.add(m);
            }
            copy.put(entry.getKey(), matches);
        }
        return copy;
    }
}

