/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

public class DirectedOutputITCase
extends AbstractTestBase {
    private static final String TEN = "ten";
    private static final String ODD = "odd";
    private static final String EVEN = "even";
    private static final String NON_SELECTED = "nonSelected";

    @Test
    public void outputSelectorTest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        TestListResultSink evenSink = new TestListResultSink();
        TestListResultSink oddAndTenSink = new TestListResultSink();
        TestListResultSink evenAndOddSink = new TestListResultSink();
        TestListResultSink allSink = new TestListResultSink();
        SplitStream source = env.generateSequence(1L, 11L).split((OutputSelector)new MyOutputSelector());
        source.select(new String[]{EVEN}).addSink(evenSink);
        source.select(new String[]{ODD, TEN}).addSink(oddAndTenSink);
        source.select(new String[]{EVEN, ODD}).addSink(evenAndOddSink);
        source.addSink(allSink);
        env.execute();
        Assert.assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), evenAndOddSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getSortedResult());
    }

    static final class MyOutputSelector
    implements OutputSelector<Long> {
        private static final long serialVersionUID = 1L;
        List<String> outputs = new ArrayList<String>();

        MyOutputSelector() {
        }

        public Iterable<String> select(Long value) {
            this.outputs.clear();
            if (value % 2L == 0L) {
                this.outputs.add(DirectedOutputITCase.EVEN);
            } else {
                this.outputs.add(DirectedOutputITCase.ODD);
            }
            if (value == 10L) {
                this.outputs.add(DirectedOutputITCase.TEN);
            }
            if (value == 11L) {
                this.outputs.add(DirectedOutputITCase.NON_SELECTED);
            }
            return this.outputs;
        }
    }
}

