/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReduceTaskExternalITCase
extends DriverTestBase<RichGroupReduceFunction<Record, Record>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTaskExternalITCase.class);
    private final RecordComparator comparator = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final List<Record> outList = new ArrayList<Record>();

    public ReduceTaskExternalITCase(ExecutionConfig config) {
        super(config, 0L, 1, 0x300000L);
    }

    @Test
    public void testSingleLevelMergeReduceTask() {
        int keyCnt = 8192;
        int valCnt = 8;
        this.setNumFileHandlesForSort(2);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            this.addInputSorted(new UniformRecordGenerator(8192, 8, false), this.comparator.duplicate());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.debug("Exception while running the test task.", (Throwable)e);
            Assert.fail((String)"Exception in Test.");
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 8192), (this.outList.size() == 8192 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == 8 - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    @Test
    public void testMultiLevelMergeReduceTask() {
        int keyCnt = 32768;
        int valCnt = 8;
        this.setNumFileHandlesForSort(2);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            this.addInputSorted(new UniformRecordGenerator(32768, 8, false), this.comparator.duplicate());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.debug("Exception while running the test task.", (Throwable)e);
            Assert.fail((String)"Exception in Test.");
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 32768), (this.outList.size() == 32768 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == 8 - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleLevelMergeCombiningReduceTask() {
        int keyCnt = 8192;
        int valCnt = 8;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try (CombiningUnilateralSortMerger sorter = null;){
            sorter = new CombiningUnilateralSortMerger((GroupCombineFunction)new MockCombiningReduceStub(), this.getMemoryManager(), this.getIOManager(), (MutableObjectIterator)new UniformRecordGenerator(8192, 8, false), this.getOwningNepheleTask(), (TypeSerializerFactory)RecordSerializerFactory.get(), (TypeComparator)this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, true);
            this.addInput((MutableObjectIterator<Record>)sorter.getIterator());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        int expSum = 0;
        for (int i = 1; i < 8; ++i) {
            expSum += i;
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 8192), (this.outList.size() == 8192 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == expSum - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultiLevelMergeCombiningReduceTask() {
        int keyCnt = 32768;
        int valCnt = 8;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try (CombiningUnilateralSortMerger sorter = null;){
            sorter = new CombiningUnilateralSortMerger((GroupCombineFunction)new MockCombiningReduceStub(), this.getMemoryManager(), this.getIOManager(), (MutableObjectIterator)new UniformRecordGenerator(keyCnt, valCnt, false), this.getOwningNepheleTask(), (TypeSerializerFactory)RecordSerializerFactory.get(), (TypeComparator)this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, false);
            this.addInput((MutableObjectIterator<Record>)sorter.getIterator());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        int expSum = 0;
        for (int i = 1; i < valCnt; ++i) {
            expSum += i;
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + keyCnt), (this.outList.size() == keyCnt ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == expSum - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    public static class MockCombiningReduceStub
    implements GroupReduceFunction<Record, Record>,
    GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private final IntValue combineValue = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                element.getField(1, (Value)this.value);
                sum += this.value.getValue();
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(sum - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }

        public void combine(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                element.getField(1, (Value)this.combineValue);
                sum += this.combineValue.getValue();
            }
            this.combineValue.setValue(sum);
            element.setField(1, (Value)this.combineValue);
            out.collect((Object)element);
        }
    }

    public static class MockReduceStub
    extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int cnt = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                ++cnt;
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(cnt - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }
    }
}

