package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/PartitionerITCase.class */
public class PartitionerITCase extends AbstractTestBase {

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/PartitionerITCase$SubtaskIndexAssigner.class */
    private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1;
        private int indexOfSubtask;

        private SubtaskIndexAssigner() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.indexOfSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        public Tuple2<Integer, String> map(Tuple1<String> tuple1) throws Exception {
            return new Tuple2<>(Integer.valueOf(this.indexOfSubtask), tuple1.f0);
        }
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testForwardFailsLowToHighParallelism() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).forward().map(new NoOpIntMap());
        executionEnvironment.execute();
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testForwardFailsHightToLowParallelism() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(new NoOpIntMap()).forward().map(new NoOpIntMap()).setParallelism(1);
        executionEnvironment.execute();
    }

    @Test
    public void partitionerTest() {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestListResultSink testListResultSink2 = new TestListResultSink();
        TestListResultSink testListResultSink3 = new TestListResultSink();
        TestListResultSink testListResultSink4 = new TestListResultSink();
        TestListResultSink testListResultSink5 = new TestListResultSink();
        TestListResultSink testListResultSink6 = new TestListResultSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        DataStreamSource fromElements = executionEnvironment.fromElements(new Tuple1[]{new Tuple1("a"), new Tuple1("b"), new Tuple1("b"), new Tuple1("a"), new Tuple1("a"), new Tuple1("c"), new Tuple1("a")});
        fromElements.keyBy(new int[]{0}).map(new SubtaskIndexAssigner()).addSink(testListResultSink);
        fromElements.partitionCustom(new Partitioner<String>() { // from class: org.apache.flink.test.streaming.runtime.PartitionerITCase.1
            public int partition(String str, int i) {
                return str.equals("c") ? 2 : 0;
            }
        }, 0).map(new SubtaskIndexAssigner()).addSink(testListResultSink2);
        fromElements.broadcast().map(new SubtaskIndexAssigner()).addSink(testListResultSink3);
        fromElements.rebalance().map(new SubtaskIndexAssigner()).addSink(testListResultSink5);
        fromElements.map(new MapFunction<Tuple1<String>, Tuple1<String>>() { // from class: org.apache.flink.test.streaming.runtime.PartitionerITCase.2
            private static final long serialVersionUID = 1;

            public Tuple1<String> map(Tuple1<String> tuple1) throws Exception {
                return tuple1;
            }
        }).forward().map(new SubtaskIndexAssigner()).addSink(testListResultSink4);
        fromElements.global().map(new SubtaskIndexAssigner()).addSink(testListResultSink6);
        try {
            executionEnvironment.execute();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
        List result = testListResultSink.getResult();
        List result2 = testListResultSink2.getResult();
        List result3 = testListResultSink3.getResult();
        List result4 = testListResultSink4.getResult();
        List result5 = testListResultSink5.getResult();
        List result6 = testListResultSink6.getResult();
        verifyHashPartitioning(result);
        verifyCustomPartitioning(result2);
        verifyBroadcastPartitioning(result3);
        verifyRebalancePartitioning(result4);
        verifyRebalancePartitioning(result5);
        verifyGlobalPartitioning(result6);
    }

    private static void verifyHashPartitioning(List<Tuple2<Integer, String>> list) {
        HashMap hashMap = new HashMap();
        for (Tuple2<Integer, String> tuple2 : list) {
            Integer num = (Integer) hashMap.get(tuple2.f1);
            if (num == null) {
                hashMap.put(tuple2.f1, tuple2.f0);
            } else if (!Objects.equals(num, tuple2.f0)) {
                Assert.fail();
            }
        }
    }

    private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> list) {
        for (Tuple2<Integer, String> tuple2 : list) {
            if (((String) tuple2.f1).equals("c")) {
                Assert.assertEquals(new Integer(2), tuple2.f0);
            } else {
                Assert.assertEquals(new Integer(0), tuple2.f0);
            }
        }
    }

    private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> list) {
        Assert.assertEquals(new HashSet(Arrays.asList(new Tuple2(0, "a"), new Tuple2(0, "b"), new Tuple2(0, "b"), new Tuple2(0, "a"), new Tuple2(0, "a"), new Tuple2(0, "c"), new Tuple2(0, "a"), new Tuple2(1, "a"), new Tuple2(1, "b"), new Tuple2(1, "b"), new Tuple2(1, "a"), new Tuple2(1, "a"), new Tuple2(1, "c"), new Tuple2(1, "a"), new Tuple2(2, "a"), new Tuple2(2, "b"), new Tuple2(2, "b"), new Tuple2(2, "a"), new Tuple2(2, "a"), new Tuple2(2, "c"), new Tuple2(2, "a"))), new HashSet(list));
    }

    private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> list) {
        Assert.assertEquals(new HashSet(Arrays.asList(new Tuple2(0, "a"), new Tuple2(1, "b"), new Tuple2(2, "b"), new Tuple2(0, "a"), new Tuple2(1, "a"), new Tuple2(2, "c"), new Tuple2(0, "a"))), new HashSet(list));
    }

    private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> list) {
        Assert.assertEquals(new HashSet(Arrays.asList(new Tuple2(0, "a"), new Tuple2(0, "b"), new Tuple2(0, "b"), new Tuple2(0, "a"), new Tuple2(0, "a"), new Tuple2(0, "c"), new Tuple2(0, "a"))), new HashSet(list));
    }
}
