/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class SuccessAfterNetworkBuffersFailureITCase
extends TestLogger {
    private static final int PARALLELISM = 16;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(SuccessAfterNetworkBuffersFailureITCase.getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(8).build());

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 800);
        return config;
    }

    @Test
    public void testSuccessfulProgramAfterFailure() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        SuccessAfterNetworkBuffersFailureITCase.runConnectedComponents(env);
        try {
            SuccessAfterNetworkBuffersFailureITCase.runKMeans(env);
            Assert.fail((String)"This program execution should have failed.");
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("Insufficient number of network buffers"));
        }
        SuccessAfterNetworkBuffersFailureITCase.runConnectedComponents(env);
    }

    private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
        env.setParallelism(16);
        env.getConfig().disableSysoutLogging();
        PartitionOperator vertices = ConnectedComponentsData.getDefaultVertexDataSet((ExecutionEnvironment)env).rebalance();
        FlatMapOperator edges = ConnectedComponentsData.getDefaultEdgeDataSet((ExecutionEnvironment)env).rebalance().flatMap((FlatMapFunction)new ConnectedComponents.UndirectEdge());
        MapOperator verticesWithInitialId = vertices.map((MapFunction)new ConnectedComponents.DuplicateValue());
        DeltaIteration iteration = verticesWithInitialId.iterateDelta((DataSet)verticesWithInitialId, 100, new int[]{0});
        JoinOperator.EquiJoin changes = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new ConnectedComponents.ComponentIdFilter());
        DataSet result = iteration.closeWith((DataSet)changes, (DataSet)changes);
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    private static void runKMeans(ExecutionEnvironment env) throws Exception {
        env.setParallelism(16);
        env.getConfig().disableSysoutLogging();
        PartitionOperator points = KMeansData.getDefaultPointDataSet((ExecutionEnvironment)env).rebalance();
        PartitionOperator centroids = KMeansData.getDefaultCentroidDataSet((ExecutionEnvironment)env).rebalance();
        IterativeDataSet loop = centroids.iterate(20);
        MapOperator newCentroids = ((MapOperator)points.map((MapFunction)new KMeans.SelectNearestCenter()).withBroadcastSet((DataSet)loop, "centroids")).rebalance().map((MapFunction)new KMeans.CountAppender()).groupBy(new int[]{0}).reduce((ReduceFunction)new KMeans.CentroidAccumulator()).rebalance().map((MapFunction)new KMeans.CentroidAverager());
        DataSet finalCentroids = loop.closeWith((DataSet)newCentroids);
        SingleInputUdfOperator clusteredPoints = points.map((MapFunction)new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        clusteredPoints.output((OutputFormat)new DiscardingOutputFormat());
        env.execute("KMeans Example");
    }
}

