/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

public class OutputSplitterITCase
extends AbstractTestBase {
    private static ArrayList<Integer> expectedSplitterResult = new ArrayList();

    @Test
    public void testOnMergedDataStream() throws Exception {
        TestListResultSink splitterResultSink1 = new TestListResultSink();
        TestListResultSink splitterResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setBufferTimeout(1L);
        DataStreamSource d1 = env.fromElements((Object[])new Integer[]{0, 2, 4, 6, 8});
        DataStreamSource d2 = env.fromElements((Object[])new Integer[]{1, 3, 5, 7, 9});
        d1 = d1.union(new DataStream[]{d2});
        d1.split((OutputSelector)new OutputSelector<Integer>(){
            private static final long serialVersionUID = 8354166915727490130L;

            public Iterable<String> select(Integer value) {
                ArrayList<String> s = new ArrayList<String>();
                if (value > 4) {
                    s.add(">");
                } else {
                    s.add("<");
                }
                return s;
            }
        }).select(new String[]{">"}).addSink(splitterResultSink1);
        d1.split((OutputSelector)new OutputSelector<Integer>(){
            private static final long serialVersionUID = -6822487543355994807L;

            public Iterable<String> select(Integer value) {
                ArrayList<String> s = new ArrayList<String>();
                if (value % 3 == 0) {
                    s.add("yes");
                } else {
                    s.add("no");
                }
                return s;
            }
        }).select(new String[]{"yes"}).addSink(splitterResultSink2);
        env.execute();
        expectedSplitterResult.clear();
        expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
        Assert.assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
        expectedSplitterResult.clear();
        expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
        Assert.assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
    }

    @Test
    public void testOnSingleDataStream() throws Exception {
        TestListResultSink splitterResultSink1 = new TestListResultSink();
        TestListResultSink splitterResultSink2 = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setBufferTimeout(1L);
        DataStreamSource ds = env.fromElements((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        ds.split((OutputSelector)new OutputSelector<Integer>(){
            private static final long serialVersionUID = 2524335410904414121L;

            public Iterable<String> select(Integer value) {
                ArrayList<String> s = new ArrayList<String>();
                if (value % 2 == 0) {
                    s.add("even");
                } else {
                    s.add("odd");
                }
                return s;
            }
        }).select(new String[]{"even"}).addSink(splitterResultSink1);
        ds.split((OutputSelector)new OutputSelector<Integer>(){
            private static final long serialVersionUID = -511693919586034092L;

            public Iterable<String> select(Integer value) {
                ArrayList<String> s = new ArrayList<String>();
                if (value % 4 == 0) {
                    s.add("yes");
                } else {
                    s.add("no");
                }
                return s;
            }
        }).select(new String[]{"yes"}).addSink(splitterResultSink2);
        env.execute();
        expectedSplitterResult.clear();
        expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
        Assert.assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
        expectedSplitterResult.clear();
        expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
        Assert.assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
    }
}

