/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.optimizer.iterations;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class ConnectedComponentsCoGroupTest
extends CompilerTestBase {
    private static final String VERTEX_SOURCE = "Vertices";
    private static final String ITERATION_NAME = "Connected Components Iteration";
    private static final String EDGES_SOURCE = "Edges";
    private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
    private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
    private static final String SINK = "Result";
    private static final boolean PRINT_PLAN = false;
    private final FieldList set0 = new FieldList(0);

    @Test
    public void testWorksetConnectedComponents() {
        Plan plan = ConnectedComponentsCoGroupTest.getConnectedComponentsCoGroupPlan();
        plan.setExecutionConfig(new ExecutionConfig());
        OptimizedPlan optPlan = this.compileNoStats(plan);
        CompilerTestBase.OptimizerPlanNodeResolver or = ConnectedComponentsCoGroupTest.getOptimizerPlanNodeResolver((OptimizedPlan)optPlan);
        SourcePlanNode vertexSource = (SourcePlanNode)or.getNode(VERTEX_SOURCE);
        SourcePlanNode edgesSource = (SourcePlanNode)or.getNode(EDGES_SOURCE);
        SinkPlanNode sink = (SinkPlanNode)or.getNode(SINK);
        WorksetIterationPlanNode iter = (WorksetIterationPlanNode)or.getNode(ITERATION_NAME);
        DualInputPlanNode neighborsJoin = (DualInputPlanNode)or.getNode(JOIN_NEIGHBORS_MATCH);
        DualInputPlanNode cogroup = (DualInputPlanNode)or.getNode(MIN_ID_AND_UPDATE);
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)sink.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)vertexSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)edgesSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.INNER_MERGE, (Object)neighborsJoin.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput2());
        Assert.assertEquals((Object)DriverStrategy.CO_GROUP, (Object)cogroup.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)cogroup.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)cogroup.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)neighborsJoin.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)neighborsJoin.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getInput2().getShipStrategyKeys());
        Assert.assertTrue((boolean)neighborsJoin.getInput2().getTempMode().isCached());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)cogroup.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)cogroup.getInput2().getShipStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)sink.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.SORT, (Object)iter.getInitialWorksetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.SORT, (Object)neighborsJoin.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.SORT, (Object)cogroup.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)cogroup.getInput2().getLocalStrategy());
        Assert.assertTrue((TempMode.CACHED == neighborsJoin.getInput2().getTempMode() ? 1 : 0) != 0);
        JobGraphGenerator jgg = new JobGraphGenerator();
        jgg.compileJobGraph(optPlan);
    }

    public static Plan getConnectedComponentsCoGroupPlan() {
        PreviewPlanEnvironment env = new PreviewPlanEnvironment();
        env.setAsContext();
        try {
            ConnectedComponentsCoGroupTest.connectedComponentsWithCoGroup(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
        }
        catch (OptimizerPlanEnvironment.ProgramAbortException programAbortException) {
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"connectedComponentsWithCoGroup failed with an exception");
        }
        return env.getPlan();
    }

    public static void connectedComponentsWithCoGroup(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(Integer.parseInt(args[0]));
        Operator initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
        Operator edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
        FlatMapOperator verticesWithId = initialVertices.flatMap((FlatMapFunction)new DummyMapFunction());
        DeltaIteration iteration = verticesWithId.iterateDelta((DataSet)verticesWithId, Integer.parseInt(args[4]), new int[]{0}).name(ITERATION_NAME);
        Operator joinWithNeighbors = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
        Operator minAndUpdate = joinWithNeighbors.coGroup((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
        iteration.closeWith((DataSet)minAndUpdate, (DataSet)minAndUpdate).writeAsCsv(args[3]).name(SINK);
        env.execute();
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0->f0"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0->f0"})
    private static class DummyCoGroupFunction
    implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private DummyCoGroupFunction() {
        }

        public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws Exception {
        }
    }

    private static class DummyJoinFunction
    implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private DummyJoinFunction() {
        }

        public void join(Tuple2<Long, Long> first, Tuple2<Long, Long> second, Collector<Tuple2<Long, Long>> out) throws Exception {
        }
    }

    private static class DummyMapFunction
    implements FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
        private DummyMapFunction() {
        }

        public void flatMap(Tuple1<Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
        }
    }
}

