package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.class */
public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
    private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments = new HashMap();
    private final SlotProviderStrategy slotProviderStrategy;
    private final InputsLocationsRetriever inputsLocationsRetriever;

    public DefaultExecutionSlotAllocator(SlotProviderStrategy slotProviderStrategy, InputsLocationsRetriever inputsLocationsRetriever) {
        this.slotProviderStrategy = (SlotProviderStrategy) Preconditions.checkNotNull(slotProviderStrategy);
        this.inputsLocationsRetriever = (InputsLocationsRetriever) Preconditions.checkNotNull(inputsLocationsRetriever);
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public List<SlotExecutionVertexAssignment> allocateSlotsFor(List<ExecutionVertexSchedulingRequirements> list) {
        validateSchedulingRequirements(list);
        ArrayList arrayList = new ArrayList(list.size());
        Set<AllocationID> computeAllPriorAllocationIds = computeAllPriorAllocationIds(list);
        for (ExecutionVertexSchedulingRequirements executionVertexSchedulingRequirements : list) {
            ExecutionVertexID executionVertexId = executionVertexSchedulingRequirements.getExecutionVertexId();
            SlotRequestId slotRequestId = new SlotRequestId();
            SlotSharingGroupId slotSharingGroupId = executionVertexSchedulingRequirements.getSlotSharingGroupId();
            LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
            CompletableFuture<U> thenCompose = calculatePreferredLocations(executionVertexId, executionVertexSchedulingRequirements.getPreferredLocations(), this.inputsLocationsRetriever).thenCompose(collection -> {
                return this.slotProviderStrategy.allocateSlot(slotRequestId, new ScheduledUnit(executionVertexId, slotSharingGroupId, executionVertexSchedulingRequirements.getCoLocationConstraint()), SlotProfile.priorAllocation(executionVertexSchedulingRequirements.getTaskResourceProfile(), executionVertexSchedulingRequirements.getPhysicalSlotResourceProfile(), collection, Collections.singletonList(executionVertexSchedulingRequirements.getPreviousAllocationId()), computeAllPriorAllocationIds));
            });
            SlotExecutionVertexAssignment slotExecutionVertexAssignment = new SlotExecutionVertexAssignment(executionVertexId, thenCompose);
            this.pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
            thenCompose.whenComplete((BiConsumer<? super U, ? super Throwable>) (logicalSlot, th) -> {
                this.pendingSlotAssignments.remove(executionVertexId);
                if (th != null) {
                    this.slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, th);
                }
            });
            arrayList.add(slotExecutionVertexAssignment);
        }
        return arrayList;
    }

    private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> collection) {
        collection.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).forEach(executionVertexID -> {
            Preconditions.checkState(!this.pendingSlotAssignments.containsKey(executionVertexID), "BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", new Object[]{executionVertexID});
        });
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public void cancel(ExecutionVertexID executionVertexID) {
        SlotExecutionVertexAssignment slotExecutionVertexAssignment = this.pendingSlotAssignments.get(executionVertexID);
        if (slotExecutionVertexAssignment != null) {
            slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public CompletableFuture<Void> stop() {
        new ArrayList(this.pendingSlotAssignments.keySet()).forEach(this::cancel);
        return CompletableFuture.completedFuture(null);
    }

    private static CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(ExecutionVertexID executionVertexID, Collection<TaskManagerLocation> collection, InputsLocationsRetriever inputsLocationsRetriever) {
        return !collection.isEmpty() ? CompletableFuture.completedFuture(collection) : getPreferredLocationsBasedOnInputs(executionVertexID, inputsLocationsRetriever);
    }

    @VisibleForTesting
    static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(ExecutionVertexID executionVertexID, InputsLocationsRetriever inputsLocationsRetriever) {
        CompletableFuture<Collection<TaskManagerLocation>> completedFuture = CompletableFuture.completedFuture(Collections.emptyList());
        ArrayList arrayList = new ArrayList();
        Iterator<Collection<ExecutionVertexID>> it = inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexID).iterator();
        while (it.hasNext()) {
            Iterator<ExecutionVertexID> it2 = it.next().iterator();
            while (true) {
                if (it2.hasNext()) {
                    Optional<CompletableFuture<TaskManagerLocation>> taskManagerLocation = inputsLocationsRetriever.getTaskManagerLocation(it2.next());
                    arrayList.getClass();
                    taskManagerLocation.ifPresent((v1) -> {
                        r1.add(v1);
                    });
                    if (arrayList.size() > 8) {
                        arrayList.clear();
                        break;
                    }
                }
            }
            completedFuture = completedFuture.thenCombine(FutureUtils.combineAll(arrayList).thenApply(HashSet::new), (collection, collection2) -> {
                return ((collection.isEmpty() || collection2.size() <= collection.size()) && !collection2.isEmpty()) ? collection2 : collection;
            });
            arrayList.clear();
        }
        return completedFuture;
    }

    @VisibleForTesting
    static Set<AllocationID> computeAllPriorAllocationIds(Collection<ExecutionVertexSchedulingRequirements> collection) {
        return (Set) collection.stream().map((v0) -> {
            return v0.getPreviousAllocationId();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
    }

    @VisibleForTesting
    int getNumberOfPendingSlotAssignments() {
        return this.pendingSlotAssignments.size();
    }
}
