/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.BlockingSink;
import org.apache.flink.test.util.IdentityMapFunction;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class BackPressureITCase
extends TestLogger {
    private static final JobID TEST_JOB_ID = new JobID();
    private static final int NUM_TASKS = 3;
    private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5;
    private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15000;
    private static final int MAX_BACK_PRESSURE_RATIO = 1;
    private TestingMiniCluster testingMiniCluster;
    private DispatcherGateway dispatcherGateway;

    @Before
    public void setUp() throws Exception {
        Configuration configuration = new Configuration();
        configuration.addAll(BackPressureITCase.createBackPressureSamplingConfiguration());
        configuration.addAll(BackPressureITCase.createNetworkBufferConfiguration());
        TestingMiniClusterConfiguration testingMiniClusterConfiguration = new TestingMiniClusterConfiguration.Builder().setNumSlotsPerTaskManager(3).setConfiguration(configuration).build();
        this.testingMiniCluster = new TestingMiniCluster(testingMiniClusterConfiguration);
        this.testingMiniCluster.start();
        this.dispatcherGateway = (DispatcherGateway)this.testingMiniCluster.getDispatcherGatewayFuture().get();
    }

    private static Configuration createBackPressureSamplingConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000);
        configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 1);
        configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, Integer.MAX_VALUE);
        return configuration;
    }

    private static Configuration createNetworkBufferConfiguration() {
        Configuration configuration = new Configuration();
        int memorySegmentSizeKb = 32;
        String networkBuffersMemory = "96kb";
        configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "32kb");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "96kb");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "96kb");
        return configuration;
    }

    @Test
    public void operatorsBecomeBackPressured() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.addSource((SourceFunction)new InfiniteIntegerSource()).slotSharingGroup("sourceGroup").map(new IdentityMapFunction()).slotSharingGroup("mapGroup").addSink(new BlockingSink()).slotSharingGroup("sinkGroup");
        JobGraph jobGraph = env.getStreamGraph().getJobGraph(TEST_JOB_ID);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceJobVertex = (JobVertex)vertices.get(0);
        JobVertex mapJobVertex = (JobVertex)vertices.get(1);
        this.testingMiniCluster.submitJob(jobGraph).get();
        this.assertJobVertexSubtasksAreBackPressured(mapJobVertex);
        this.assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
    }

    private void assertJobVertexSubtasksAreBackPressured(JobVertex jobVertex) throws Exception {
        try {
            Deadline timeout = Deadline.fromNow((Duration)Duration.ofMillis(15000L));
            CommonTestUtils.waitUntilCondition(this.isJobVertexBackPressured(jobVertex), (Deadline)timeout, (long)5L);
        }
        catch (TimeoutException e) {
            String errorMessage = String.format("Subtasks of job vertex %s were not back pressured within timeout", jobVertex);
            throw new AssertionError(errorMessage, e);
        }
    }

    private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(JobVertex sourceJobVertex) {
        return () -> {
            OperatorBackPressureStatsResponse backPressureStatsResponse = (OperatorBackPressureStatsResponse)this.dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID()).get();
            return backPressureStatsResponse.getOperatorBackPressureStats().map(backPressureStats -> BackPressureITCase.isBackPressured(backPressureStats)).orElse(false);
        };
    }

    private static boolean isBackPressured(OperatorBackPressureStats backPressureStats) {
        for (int i = 0; i < backPressureStats.getNumberOfSubTasks(); ++i) {
            double subtaskBackPressureRatio = backPressureStats.getBackPressureRatio(i);
            if (subtaskBackPressureRatio == 1.0) continue;
            return false;
        }
        return true;
    }

    @After
    public void tearDown() throws Exception {
        this.testingMiniCluster.close();
    }
}

