package org.apache.flink.runtime.scheduler.strategy;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.class */
public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final DeploymentOption deploymentOption = new DeploymentOption(false);
    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>> correlatedResultPartitions = new HashMap();
    private final Map<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>> partitionConsumerRegions = new HashMap();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }

    public PipelinedRegionSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        init();
    }

    private void init() {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            for (SchedulingResultPartition schedulingResultPartition : schedulingPipelinedRegion.getConsumedResults()) {
                Preconditions.checkState(schedulingResultPartition.getResultType() == ResultPartitionType.BLOCKING);
                this.partitionConsumerRegions.computeIfAbsent(schedulingResultPartition.getId(), intermediateResultPartitionID -> {
                    return new HashSet();
                }).add(schedulingPipelinedRegion);
                this.correlatedResultPartitions.computeIfAbsent(schedulingResultPartition.getResultId(), intermediateDataSetID -> {
                    return new HashSet();
                }).add(schedulingResultPartition);
            }
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        maybeScheduleRegions((Set) IterableUtils.toStream(this.schedulingTopology.getAllPipelinedRegions()).filter(schedulingPipelinedRegion -> {
            return !schedulingPipelinedRegion.getConsumedResults().iterator().hasNext();
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        Stream<ExecutionVertexID> stream = set.stream();
        SchedulingTopology schedulingTopology = this.schedulingTopology;
        schedulingTopology.getClass();
        maybeScheduleRegions((Set) stream.map((v1) -> {
            return r1.getPipelinedRegionOfVertex(v1);
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            maybeScheduleRegions((Set) ((Set) IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexID).getProducedResults()).filter(schedulingResultPartition -> {
                return this.partitionConsumerRegions.containsKey(schedulingResultPartition.getId());
            }).filter(schedulingResultPartition2 -> {
                return schedulingResultPartition2.getState() == ResultPartitionState.CONSUMABLE;
            }).flatMap(schedulingResultPartition3 -> {
                return this.correlatedResultPartitions.get(schedulingResultPartition3.getResultId()).stream();
            }).collect(Collectors.toSet())).stream().flatMap(schedulingResultPartition4 -> {
                return this.partitionConsumerRegions.get(schedulingResultPartition4.getId()).stream();
            }).collect(Collectors.toSet()));
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(IntermediateResultPartitionID intermediateResultPartitionID) {
    }

    private void maybeScheduleRegions(Set<SchedulingPipelinedRegion> set) {
        Iterator<SchedulingPipelinedRegion> it = SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(this.schedulingTopology, set).iterator();
        while (it.hasNext()) {
            maybeScheduleRegion(it.next());
        }
    }

    private void maybeScheduleRegion(SchedulingPipelinedRegion schedulingPipelinedRegion) {
        if (areRegionInputsAllConsumable(schedulingPipelinedRegion)) {
            Preconditions.checkState(areRegionVerticesAllInCreatedState(schedulingPipelinedRegion), "BUG: trying to schedule a region which is not in CREATED state");
            this.schedulerOperations.allocateSlotsAndDeploy(SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(this.schedulingTopology, (Set) IterableUtils.toStream(schedulingPipelinedRegion.getVertices()).map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet()), executionVertexID -> {
                return this.deploymentOption;
            }));
        }
    }

    private boolean areRegionInputsAllConsumable(SchedulingPipelinedRegion schedulingPipelinedRegion) {
        return IterableUtils.toStream(schedulingPipelinedRegion.getConsumedResults()).allMatch(schedulingResultPartition -> {
            return schedulingResultPartition.getState() == ResultPartitionState.CONSUMABLE;
        });
    }

    private boolean areRegionVerticesAllInCreatedState(SchedulingPipelinedRegion schedulingPipelinedRegion) {
        return IterableUtils.toStream(schedulingPipelinedRegion.getVertices()).allMatch(schedulingExecutionVertex -> {
            return schedulingExecutionVertex.getState() == ExecutionState.CREATED;
        });
    }
}
