/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyUtils;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertexwiseSchedulingStrategy
implements SchedulingStrategy,
SchedulingTopologyListener {
    private static final Logger LOG = LoggerFactory.getLogger(VertexwiseSchedulingStrategy.class);
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Set<ExecutionVertexID> newVertices = new HashSet<ExecutionVertexID>();
    private final Set<ExecutionVertexID> scheduledVertices = new HashSet<ExecutionVertexID>();
    private final InputConsumableDecider inputConsumableDecider;

    public VertexwiseSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology, InputConsumableDecider.Factory inputConsumableDeciderFactory) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.inputConsumableDecider = inputConsumableDeciderFactory.createInstance(schedulingTopology, this.scheduledVertices::contains);
        LOG.info("Using InputConsumableDecider {} for VertexwiseSchedulingStrategy.", (Object)this.inputConsumableDecider.getClass().getName());
        schedulingTopology.registerSchedulingTopologyListener(this);
    }

    @Override
    public void startScheduling() {
        Set<ExecutionVertexID> sourceVertices = IterableUtils.toStream(this.schedulingTopology.getVertices()).filter(vertex -> vertex.getConsumedPartitionGroups().isEmpty()).map(Vertex::getId).collect(Collectors.toSet());
        this.maybeScheduleVertices(sourceVertices);
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        this.scheduledVertices.removeAll(verticesToRestart);
        this.maybeScheduleVertices(verticesToRestart);
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            SchedulingExecutionVertex executionVertex = this.schedulingTopology.getVertex(executionVertexId);
            Set<ExecutionVertexID> consumerVertices = IterableUtils.toStream(executionVertex.getProducedResults()).map(SchedulingResultPartition::getConsumerVertexGroups).flatMap(Collection::stream).filter(group -> this.inputConsumableDecider.isConsumableBasedOnFinishedProducers(group.getConsumedPartitionGroup())).flatMap(IterableUtils::toStream).collect(Collectors.toSet());
            this.maybeScheduleVertices(consumerVertices);
        }
    }

    @Override
    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
    }

    @Override
    public void notifySchedulingTopologyUpdated(SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) {
        Preconditions.checkState((schedulingTopology == this.schedulingTopology ? 1 : 0) != 0);
        this.newVertices.addAll(newExecutionVertices);
    }

    private void maybeScheduleVertices(Set<ExecutionVertexID> vertices) {
        Set<ExecutionVertexID> allCandidates;
        if (this.newVertices.isEmpty()) {
            allCandidates = vertices;
        } else {
            allCandidates = new HashSet<ExecutionVertexID>(vertices);
            allCandidates.addAll(this.newVertices);
            this.newVertices.clear();
        }
        HashSet<ExecutionVertexID> verticesToSchedule = new HashSet<ExecutionVertexID>();
        Set<ExecutionVertexID> nextVertices = allCandidates;
        while (!nextVertices.isEmpty()) {
            nextVertices = this.addToScheduleAndGetVertices(nextVertices, verticesToSchedule);
        }
        this.scheduleVerticesOneByOne(verticesToSchedule);
        this.scheduledVertices.addAll(verticesToSchedule);
    }

    private Set<ExecutionVertexID> addToScheduleAndGetVertices(Set<ExecutionVertexID> currentVertices, Set<ExecutionVertexID> verticesToSchedule) {
        HashSet<ExecutionVertexID> nextVertices = new HashSet<ExecutionVertexID>();
        IdentityHashMap<ConsumedPartitionGroup, Boolean> consumableStatusCache = new IdentityHashMap<ConsumedPartitionGroup, Boolean>();
        Set visitedConsumerVertexGroup = Collections.newSetFromMap(new IdentityHashMap());
        for (ExecutionVertexID currentVertex : currentVertices) {
            if (!this.isVertexSchedulable(currentVertex, consumableStatusCache, verticesToSchedule)) continue;
            verticesToSchedule.add(currentVertex);
            Set canBePipelinedConsumerVertexGroups = IterableUtils.toStream(this.schedulingTopology.getVertex(currentVertex).getProducedResults()).map(SchedulingResultPartition::getConsumerVertexGroups).flatMap(Collection::stream).filter(consumerVertexGroup -> consumerVertexGroup.getResultPartitionType().canBePipelinedConsumed()).collect(Collectors.toSet());
            for (ConsumerVertexGroup consumerVertexGroup2 : canBePipelinedConsumerVertexGroups) {
                if (visitedConsumerVertexGroup.contains(consumerVertexGroup2)) continue;
                visitedConsumerVertexGroup.add(consumerVertexGroup2);
                nextVertices.addAll(IterableUtils.toStream((Iterable)consumerVertexGroup2).collect(Collectors.toSet()));
            }
        }
        return nextVertices;
    }

    private boolean isVertexSchedulable(ExecutionVertexID vertex, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache, Set<ExecutionVertexID> verticesToSchedule) {
        return !verticesToSchedule.contains(vertex) && !this.scheduledVertices.contains(vertex) && this.inputConsumableDecider.isInputConsumable(this.schedulingTopology.getVertex(vertex), verticesToSchedule, consumableStatusCache);
    }

    private void scheduleVerticesOneByOne(Set<ExecutionVertexID> verticesToSchedule) {
        if (verticesToSchedule.isEmpty()) {
            return;
        }
        List<ExecutionVertexID> sortedVerticesToSchedule = SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder(this.schedulingTopology, verticesToSchedule);
        sortedVerticesToSchedule.forEach(id -> this.schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id)));
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        private final InputConsumableDecider.Factory inputConsumableDeciderFactory;

        public Factory(InputConsumableDecider.Factory inputConsumableDeciderFactory) {
            this.inputConsumableDeciderFactory = inputConsumableDeciderFactory;
        }

        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology, this.inputConsumableDeciderFactory);
        }
    }
}

