package org.apache.flink.runtime.scheduler.slowtaskdetector;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.class */
public class ExecutionTimeBasedSlowTaskDetector implements SlowTaskDetector {
    private final long checkIntervalMillis;
    private final long baselineLowerBoundMillis;
    private final double baselineRatio;
    private final double baselineMultiplier;
    private ScheduledFuture<?> scheduledDetectionFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector$ExecutionTimeWithInputBytes.class */
    public static class ExecutionTimeWithInputBytes implements Comparable<ExecutionTimeWithInputBytes> {
        private final long executionTime;
        private final long inputBytes;

        public ExecutionTimeWithInputBytes(long j, long j2) {
            this.executionTime = j;
            this.inputBytes = j2;
        }

        public long getExecutionTime() {
            return this.executionTime;
        }

        public long getInputBytes() {
            return this.inputBytes;
        }

        @Override // java.lang.Comparable
        public int compareTo(ExecutionTimeWithInputBytes executionTimeWithInputBytes) {
            if (this.inputBytes != -1 && executionTimeWithInputBytes.getInputBytes() != -1) {
                return Double.compare(this.executionTime / Math.max(this.inputBytes, Double.MIN_VALUE), executionTimeWithInputBytes.getExecutionTime() / Math.max(executionTimeWithInputBytes.getInputBytes(), Double.MIN_VALUE));
            }
            if ((this.inputBytes == -1 && executionTimeWithInputBytes.getInputBytes() == -1) || this.executionTime == 0 || executionTimeWithInputBytes.executionTime == 0) {
                return (int) (this.executionTime - executionTimeWithInputBytes.getExecutionTime());
            }
            throw new IllegalArgumentException("Both compared elements should be NUM_BYTES_UNKNOWN.");
        }
    }

    public ExecutionTimeBasedSlowTaskDetector(Configuration configuration) {
        this.checkIntervalMillis = ((Duration) configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL)).toMillis();
        Preconditions.checkArgument(this.checkIntervalMillis > 0, "The configuration {} should be positive, but is {}.", new Object[]{SlowTaskDetectorOptions.CHECK_INTERVAL.key(), Long.valueOf(this.checkIntervalMillis)});
        this.baselineLowerBoundMillis = ((Duration) configuration.get(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND)).toMillis();
        Preconditions.checkArgument(this.baselineLowerBoundMillis >= 0, "The configuration {} cannot be negative, but is {}.", new Object[]{SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND.key(), Long.valueOf(this.baselineLowerBoundMillis)});
        this.baselineRatio = configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO);
        Preconditions.checkArgument(this.baselineRatio >= 0.0d && this.baselineRatio < 1.0d, "The configuration {} should be in [0, 1), but is {}.", new Object[]{SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO.key(), Double.valueOf(this.baselineRatio)});
        this.baselineMultiplier = configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER);
        Preconditions.checkArgument(this.baselineMultiplier > 0.0d, "The configuration {} should be positive, but is {}.", new Object[]{SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER.key(), Double.valueOf(this.baselineMultiplier)});
    }

    @Override // org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector
    public void start(ExecutionGraph executionGraph, SlowTaskDetectorListener slowTaskDetectorListener, ComponentMainThreadExecutor componentMainThreadExecutor) {
        scheduleTask(executionGraph, slowTaskDetectorListener, componentMainThreadExecutor);
    }

    private void scheduleTask(ExecutionGraph executionGraph, SlowTaskDetectorListener slowTaskDetectorListener, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.scheduledDetectionFuture = componentMainThreadExecutor.schedule(() -> {
            slowTaskDetectorListener.notifySlowTasks(findSlowTasks(executionGraph));
            scheduleTask(executionGraph, slowTaskDetectorListener, componentMainThreadExecutor);
        }, this.checkIntervalMillis, TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    Map<ExecutionVertexID, Collection<ExecutionAttemptID>> findSlowTasks(ExecutionGraph executionGraph) {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        for (ExecutionJobVertex executionJobVertex : getJobVerticesToCheck(executionGraph)) {
            ExecutionTimeWithInputBytes baseline = getBaseline(executionJobVertex, currentTimeMillis);
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                if (!executionVertex.getExecutionState().isTerminal()) {
                    List<ExecutionAttemptID> findExecutionsExceedingBaseline = findExecutionsExceedingBaseline(executionVertex.getCurrentExecutions(), baseline, currentTimeMillis);
                    if (!findExecutionsExceedingBaseline.isEmpty()) {
                        hashMap.put(executionVertex.getID(), findExecutionsExceedingBaseline);
                    }
                }
            }
        }
        return hashMap;
    }

    private List<ExecutionJobVertex> getJobVerticesToCheck(ExecutionGraph executionGraph) {
        return (List) IterableUtils.toStream(executionGraph.getVerticesTopologically()).filter((v0) -> {
            return v0.isInitialized();
        }).filter(executionJobVertex -> {
            return executionJobVertex.getAggregateState() != ExecutionState.FINISHED;
        }).filter(executionJobVertex2 -> {
            return getFinishedRatio(executionJobVertex2) >= this.baselineRatio;
        }).collect(Collectors.toList());
    }

    private double getFinishedRatio(ExecutionJobVertex executionJobVertex) {
        Preconditions.checkState(executionJobVertex.getTaskVertices().length > 0);
        return Arrays.stream(executionJobVertex.getTaskVertices()).filter(executionVertex -> {
            return executionVertex.getExecutionState() == ExecutionState.FINISHED;
        }).count() / executionJobVertex.getTaskVertices().length;
    }

    private ExecutionTimeWithInputBytes getBaseline(ExecutionJobVertex executionJobVertex, long j) {
        return new ExecutionTimeWithInputBytes((long) (r0.getExecutionTime() * this.baselineMultiplier), calculateFinishedTaskExecutionTimeMedian(executionJobVertex, j).getInputBytes());
    }

    private ExecutionTimeWithInputBytes calculateFinishedTaskExecutionTimeMedian(ExecutionJobVertex executionJobVertex, long j) {
        int round = (int) Math.round(executionJobVertex.getParallelism() * this.baselineRatio);
        if (round == 0) {
            return new ExecutionTimeWithInputBytes(0L, -1L);
        }
        List list = (List) Arrays.stream(executionJobVertex.getTaskVertices()).flatMap(executionVertex -> {
            return executionVertex.getCurrentExecutions().stream();
        }).filter(execution -> {
            return execution.getState() == ExecutionState.FINISHED;
        }).collect(Collectors.toList());
        Preconditions.checkState(list.size() >= round);
        return (ExecutionTimeWithInputBytes) ((List) list.stream().map(execution2 -> {
            return getExecutionTimeAndInputBytes(execution2, j);
        }).sorted().limit(round).collect(Collectors.toList())).get(round / 2);
    }

    private List<ExecutionAttemptID> findExecutionsExceedingBaseline(Collection<Execution> collection, ExecutionTimeWithInputBytes executionTimeWithInputBytes, long j) {
        return (List) collection.stream().filter(execution -> {
            return (execution.getState().isTerminal() || execution.getState() == ExecutionState.CANCELING || execution.getState() == ExecutionState.CREATED) ? false : true;
        }).filter(execution2 -> {
            ExecutionTimeWithInputBytes executionTimeAndInputBytes = getExecutionTimeAndInputBytes(execution2, j);
            return executionTimeAndInputBytes.getExecutionTime() >= this.baselineLowerBoundMillis && executionTimeAndInputBytes.compareTo(executionTimeWithInputBytes) >= 0;
        }).map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toList());
    }

    private long getExecutionTime(Execution execution, long j) {
        long stateTimestamp = execution.getStateTimestamp(ExecutionState.DEPLOYING);
        if (stateTimestamp == 0) {
            return 0L;
        }
        return execution.getState() == ExecutionState.FINISHED ? execution.getStateTimestamp(ExecutionState.FINISHED) - stateTimestamp : j - stateTimestamp;
    }

    private long getExecutionInputBytes(Execution execution) {
        return execution.getVertex().getInputBytes();
    }

    private ExecutionTimeWithInputBytes getExecutionTimeAndInputBytes(Execution execution, long j) {
        return new ExecutionTimeWithInputBytes(getExecutionTime(execution, j), getExecutionInputBytes(execution));
    }

    @Override // org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector
    public void stop() {
        if (this.scheduledDetectionFuture != null) {
            this.scheduledDetectionFuture.cancel(false);
        }
    }
}
