package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.class */
public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy {
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy$Factory.class */
    public static class Factory implements PartitionReleaseStrategy.Factory {
        @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy.Factory
        public PartitionReleaseStrategy createInstance(SchedulingTopology schedulingTopology) {
            return new RegionPartitionReleaseStrategy(schedulingTopology);
        }
    }

    public RegionPartitionReleaseStrategy(SchedulingTopology schedulingTopology) {
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        initRegionExecutionViewByVertex();
    }

    private void initRegionExecutionViewByVertex() {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(schedulingPipelinedRegion);
            Iterator<? extends SchedulingExecutionVertex> it = schedulingPipelinedRegion.getVertices().iterator();
            while (it.hasNext()) {
                this.regionExecutionViewByVertex.put(it.next().getId(), pipelinedRegionExecutionView);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy
    public List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionViewForVertex = getPipelinedRegionExecutionViewForVertex(executionVertexID);
        pipelinedRegionExecutionViewForVertex.vertexFinished(executionVertexID);
        return pipelinedRegionExecutionViewForVertex.isFinished() ? filterReleasablePartitions(this.schedulingTopology.getPipelinedRegionOfVertex(executionVertexID).getConsumedResults()) : Collections.emptyList();
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy
    public void vertexUnfinished(ExecutionVertexID executionVertexID) {
        getPipelinedRegionExecutionViewForVertex(executionVertexID).vertexUnfinished(executionVertexID);
    }

    private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.regionExecutionViewByVertex.get(executionVertexID);
        Preconditions.checkState(pipelinedRegionExecutionView != null, "PipelinedRegionExecutionView not found for execution vertex %s", new Object[]{executionVertexID});
        return pipelinedRegionExecutionView;
    }

    private List<IntermediateResultPartitionID> filterReleasablePartitions(Iterable<? extends SchedulingResultPartition> iterable) {
        return (List) IterableUtils.toStream(iterable).map((v0) -> {
            return v0.getId();
        }).filter(this::areConsumerRegionsFinished).collect(Collectors.toList());
    }

    private boolean areConsumerRegionsFinished(IntermediateResultPartitionID intermediateResultPartitionID) {
        return IterableUtils.toStream(this.schedulingTopology.getResultPartition(intermediateResultPartitionID).getConsumers()).map((v0) -> {
            return v0.getId();
        }).allMatch(this::isRegionOfVertexFinished);
    }

    private boolean isRegionOfVertexFinished(ExecutionVertexID executionVertexID) {
        return getPipelinedRegionExecutionViewForVertex(executionVertexID).isFinished();
    }
}
