package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.class */
public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
    private static final Predicate<SchedulingExecutionVertex> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> {
        return ExecutionState.CREATED == schedulingExecutionVertex.getState();
    };
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, DeploymentOption> deploymentOptions = new HashMap();
    private final InputDependencyConstraintChecker inputConstraintChecker = new InputDependencyConstraintChecker();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology, JobGraph jobGraph) {
            return new LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }

    public LazyFromSourcesSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        DeploymentOption deploymentOption = new DeploymentOption(true);
        DeploymentOption deploymentOption2 = new DeploymentOption(false);
        for (SchedulingExecutionVertex schedulingExecutionVertex : this.schedulingTopology.getVertices()) {
            DeploymentOption deploymentOption3 = deploymentOption2;
            for (SchedulingResultPartition schedulingResultPartition : schedulingExecutionVertex.getProducedResultPartitions()) {
                if (schedulingResultPartition.getPartitionType().isPipelined()) {
                    deploymentOption3 = deploymentOption;
                }
                this.inputConstraintChecker.addSchedulingResultPartition(schedulingResultPartition);
            }
            this.deploymentOptions.put(schedulingExecutionVertex.getId(), deploymentOption3);
        }
        allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        Stream<ExecutionVertexID> stream = set.stream();
        SchedulingTopology schedulingTopology = this.schedulingTopology;
        schedulingTopology.getClass();
        Stream flatMap = stream.map(schedulingTopology::getVertexOrThrow).flatMap(schedulingExecutionVertex -> {
            return schedulingExecutionVertex.getProducedResultPartitions().stream();
        });
        InputDependencyConstraintChecker inputDependencyConstraintChecker = this.inputConstraintChecker;
        inputDependencyConstraintChecker.getClass();
        flatMap.forEach(inputDependencyConstraintChecker::resetSchedulingResultPartition);
        allocateSlotsAndDeployExecutionVertexIds(set);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (ExecutionState.FINISHED.equals(executionState)) {
            allocateSlotsAndDeployExecutionVertices((Set) this.schedulingTopology.getVertexOrThrow(executionVertexID).getProducedResultPartitions().stream().flatMap(schedulingResultPartition -> {
                return this.inputConstraintChecker.markSchedulingResultPartitionFinished(schedulingResultPartition).stream();
            }).flatMap(schedulingResultPartition2 -> {
                return schedulingResultPartition2.getConsumers().stream();
            }).collect(Collectors.toSet()));
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(ExecutionVertexID executionVertexID, ResultPartitionID resultPartitionID) {
        SchedulingResultPartition resultPartitionOrThrow = this.schedulingTopology.getResultPartitionOrThrow(resultPartitionID.getPartitionId());
        if (resultPartitionOrThrow.getPartitionType().isPipelined()) {
            if (!this.schedulingTopology.getVertexOrThrow(executionVertexID).getProducedResultPartitions().contains(resultPartitionOrThrow)) {
                throw new IllegalStateException("partition " + resultPartitionID + " is not the produced partition of " + executionVertexID);
            }
            allocateSlotsAndDeployExecutionVertices(resultPartitionOrThrow.getConsumers());
        }
    }

    private void allocateSlotsAndDeployExecutionVertexIds(Set<ExecutionVertexID> set) {
        Stream<ExecutionVertexID> stream = set.stream();
        SchedulingTopology schedulingTopology = this.schedulingTopology;
        schedulingTopology.getClass();
        allocateSlotsAndDeployExecutionVertices((Collection) stream.map(schedulingTopology::getVertexOrThrow).collect(Collectors.toList()));
    }

    private void allocateSlotsAndDeployExecutionVertices(Collection<SchedulingExecutionVertex> collection) {
        this.schedulerOperations.allocateSlotsAndDeploy((Collection) collection.stream().filter(isInputConstraintSatisfied().and(IS_IN_CREATED_EXECUTION_STATE)).map((v0) -> {
            return v0.getId();
        }).map(executionVertexID -> {
            return new ExecutionVertexDeploymentOption(executionVertexID, this.deploymentOptions.get(executionVertexID));
        }).collect(Collectors.toSet()));
    }

    private Predicate<SchedulingExecutionVertex> isInputConstraintSatisfied() {
        InputDependencyConstraintChecker inputDependencyConstraintChecker = this.inputConstraintChecker;
        inputDependencyConstraintChecker.getClass();
        return inputDependencyConstraintChecker::check;
    }

    private Set<ExecutionVertexID> getAllVerticesFromTopology() {
        return (Set) StreamSupport.stream(this.schedulingTopology.getVertices().spliterator(), false).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
    }
}
