/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyUtils;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class PipelinedRegionSchedulingStrategy
implements SchedulingStrategy {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final DeploymentOption deploymentOption = new DeploymentOption(false);
    private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>> correlatedResultPartitions = new HashMap<IntermediateDataSetID, Set<SchedulingResultPartition>>();
    private final Map<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>> partitionConsumerRegions = new HashMap<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>>();
    private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted = new IdentityHashMap<SchedulingPipelinedRegion, List<ExecutionVertexID>>();

    public PipelinedRegionSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.init();
    }

    private void init() {
        for (SchedulingPipelinedRegion region : this.schedulingTopology.getAllPipelinedRegions()) {
            for (SchedulingResultPartition partition : region.getConsumedResults()) {
                Preconditions.checkState((boolean)partition.getResultType().isBlocking());
                this.partitionConsumerRegions.computeIfAbsent((IntermediateResultPartitionID)partition.getId(), (Function<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>>)((Function<IntermediateResultPartitionID, Set>)pid -> new HashSet())).add(region);
                this.correlatedResultPartitions.computeIfAbsent(partition.getResultId(), rid -> new HashSet()).add(partition);
            }
        }
        for (SchedulingExecutionVertex vertex : this.schedulingTopology.getVertices()) {
            SchedulingPipelinedRegion region = (SchedulingPipelinedRegion)this.schedulingTopology.getPipelinedRegionOfVertex(vertex.getId());
            this.regionVerticesSorted.computeIfAbsent(region, r -> new ArrayList()).add(vertex.getId());
        }
    }

    @Override
    public void startScheduling() {
        Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils.toStream(this.schedulingTopology.getAllPipelinedRegions()).filter(region -> !region.getConsumedResults().iterator().hasNext()).collect(Collectors.toSet());
        this.maybeScheduleRegions(sourceRegions);
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        Set<SchedulingPipelinedRegion> regionsToRestart = verticesToRestart.stream().map(this.schedulingTopology::getPipelinedRegionOfVertex).collect(Collectors.toSet());
        this.maybeScheduleRegions(regionsToRestart);
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            Set finishedPartitions = IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexId).getProducedResults()).filter(partition -> this.partitionConsumerRegions.containsKey(partition.getId())).filter(partition -> partition.getState() == ResultPartitionState.CONSUMABLE).flatMap(partition -> this.correlatedResultPartitions.get(partition.getResultId()).stream()).collect(Collectors.toSet());
            Set<SchedulingPipelinedRegion> consumerRegions = finishedPartitions.stream().flatMap(partition -> this.partitionConsumerRegions.get(partition.getId()).stream()).collect(Collectors.toSet());
            this.maybeScheduleRegions(consumerRegions);
        }
    }

    @Override
    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
    }

    private void maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions) {
        List<SchedulingPipelinedRegion> regionsSorted = SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(this.schedulingTopology, regions);
        for (SchedulingPipelinedRegion region : regionsSorted) {
            this.maybeScheduleRegion(region);
        }
    }

    private void maybeScheduleRegion(SchedulingPipelinedRegion region) {
        if (!this.areRegionInputsAllConsumable(region)) {
            return;
        }
        Preconditions.checkState((boolean)this.areRegionVerticesAllInCreatedState(region), (Object)"BUG: trying to schedule a region which is not in CREATED state");
        List<ExecutionVertexDeploymentOption> vertexDeploymentOptions = SchedulingStrategyUtils.createExecutionVertexDeploymentOptions((Collection<ExecutionVertexID>)this.regionVerticesSorted.get(region), id -> this.deploymentOption);
        this.schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

    private boolean areRegionInputsAllConsumable(SchedulingPipelinedRegion region) {
        for (SchedulingResultPartition partition : region.getConsumedResults()) {
            if (partition.getState() == ResultPartitionState.CONSUMABLE) continue;
            return false;
        }
        return true;
    }

    private boolean areRegionVerticesAllInCreatedState(SchedulingPipelinedRegion region) {
        for (SchedulingExecutionVertex vertex : region.getVertices()) {
            if (vertex.getState() == ExecutionState.CREATED) continue;
            return false;
        }
        return true;
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }
}

