/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;

public class IterationTerminationWithTwoTails
extends JavaProgramTestBase {
    private static final String EXPECTED = "22\n";

    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        Operator initialInput = env.fromElements((Object[])new String[]{"1", "2", "3", "4", "5"}).name("input");
        IterativeDataSet iteration = (IterativeDataSet)initialInput.iterate(5).name("Loop");
        Operator sumReduce = iteration.reduceGroup((GroupReduceFunction)new SumReducer()).name("Compute sum (GroupReduce");
        Operator terminationFilter = iteration.filter((FilterFunction)new TerminationFilter()).name("Compute termination criterion (Map)");
        List result = iteration.closeWith((DataSet)sumReduce, (DataSet)terminationFilter).collect();
        IterationTerminationWithTwoTails.containsResultAsText((List)result, (String)EXPECTED);
    }

    private static class TerminationFilter
    implements FilterFunction<String> {
        private static final long serialVersionUID = 1L;

        private TerminationFilter() {
        }

        public boolean filter(String value) throws Exception {
            return Integer.parseInt(value) < 21;
        }
    }

    private static final class SumReducer
    implements GroupReduceFunction<String, String> {
        private static final long serialVersionUID = 1L;

        private SumReducer() {
        }

        public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
            int sum = 0;
            for (String value : values) {
                sum += Integer.parseInt(value) + 1;
            }
            out.collect((Object)("" + sum));
        }
    }
}

