package org.apache.flink.test.streaming.runtime;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.BlockingSink;
import org.apache.flink.test.util.IdentityMapFunction;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/BackPressureITCase.class */
public class BackPressureITCase extends TestLogger {
    private static final JobID TEST_JOB_ID = new JobID();
    private static final int NUM_TASKS = 3;
    private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5;
    private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15000;
    private static final int MAX_BACK_PRESSURE_RATIO = 1;
    private TestingMiniCluster testingMiniCluster;
    private DispatcherGateway dispatcherGateway;

    @Before
    public void setUp() throws Exception {
        Configuration configuration = new Configuration();
        configuration.addAll(createBackPressureSamplingConfiguration());
        configuration.addAll(createNetworkBufferConfiguration());
        this.testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder().setNumSlotsPerTaskManager(NUM_TASKS).setConfiguration(configuration).build());
        this.testingMiniCluster.start();
        this.dispatcherGateway = (DispatcherGateway) this.testingMiniCluster.getDispatcherGatewayFuture().get();
    }

    private static Configuration createBackPressureSamplingConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000);
        configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, MAX_BACK_PRESSURE_RATIO);
        configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, Integer.MAX_VALUE);
        return configuration;
    }

    private static Configuration createNetworkBufferConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "32kb");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "96kb");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "96kb");
        return configuration;
    }

    @Test
    public void operatorsBecomeBackPressured() throws Exception {
        StreamExecutionEnvironment parallelism = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(MAX_BACK_PRESSURE_RATIO);
        parallelism.addSource(new InfiniteIntegerSource()).slotSharingGroup("sourceGroup").map(new IdentityMapFunction()).slotSharingGroup("mapGroup").addSink(new BlockingSink()).slotSharingGroup("sinkGroup");
        JobGraph jobGraph = parallelism.getStreamGraph().getJobGraph(TEST_JOB_ID);
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(MAX_BACK_PRESSURE_RATIO);
        this.testingMiniCluster.submitJob(jobGraph).get();
        assertJobVertexSubtasksAreBackPressured(jobVertex2);
        assertJobVertexSubtasksAreBackPressured(jobVertex);
    }

    private void assertJobVertexSubtasksAreBackPressured(JobVertex jobVertex) throws Exception {
        try {
            CommonTestUtils.waitUntilCondition(isJobVertexBackPressured(jobVertex), Deadline.fromNow(Duration.ofMillis(15000L)), 5L);
        } catch (TimeoutException e) {
            throw new AssertionError(String.format("Subtasks of job vertex %s were not back pressured within timeout", jobVertex), e);
        }
    }

    private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(JobVertex jobVertex) {
        return () -> {
            return (Boolean) ((OperatorBackPressureStatsResponse) this.dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, jobVertex.getID()).get()).getOperatorBackPressureStats().map(operatorBackPressureStats -> {
                return Boolean.valueOf(isBackPressured(operatorBackPressureStats));
            }).orElse(false);
        };
    }

    private static boolean isBackPressured(OperatorBackPressureStats operatorBackPressureStats) {
        for (int i = 0; i < operatorBackPressureStats.getNumberOfSubTasks(); i += MAX_BACK_PRESSURE_RATIO) {
            if (operatorBackPressureStats.getBackPressureRatio(i) != 1.0d) {
                return false;
            }
        }
        return true;
    }

    @After
    public void tearDown() throws Exception {
        this.testingMiniCluster.close();
    }
}
