package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/Scheduler.class */
public class Scheduler implements InstanceListener, SlotAvailabilityListener {
    static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final Object globalLock;
    private final ExecutorService executor;
    private final Set<Instance> allInstances;
    private final Queue<Instance> instancesWithAvailableResources;
    private final Queue<QueuedTask> taskQueue;
    private final BlockingQueue<Instance> newlyAvailableInstances;
    private int unconstrainedAssignments;
    private int localizedAssignments;
    private int nonLocalizedAssignments;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.jobmanager.scheduler.Scheduler$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/Scheduler$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality = new int[Locality.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.UNCONSTRAINED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.LOCAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.NON_LOCAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/Scheduler$QueuedTask.class */
    public static final class QueuedTask {
        private final ScheduledUnit task;
        private final SlotAllocationFuture future;

        public QueuedTask(ScheduledUnit scheduledUnit, SlotAllocationFuture slotAllocationFuture) {
            this.task = scheduledUnit;
            this.future = slotAllocationFuture;
        }

        public ScheduledUnit getTask() {
            return this.task;
        }

        public SlotAllocationFuture getFuture() {
            return this.future;
        }
    }

    public Scheduler() {
        this(null);
    }

    public Scheduler(ExecutorService executorService) {
        this.globalLock = new Object();
        this.allInstances = new HashSet();
        this.instancesWithAvailableResources = new SetQueue();
        this.taskQueue = new ArrayDeque();
        this.executor = executorService;
        this.newlyAvailableInstances = new LinkedBlockingQueue();
    }

    public void shutdown() {
        synchronized (this.globalLock) {
            for (Instance instance : this.allInstances) {
                instance.removeSlotListener();
                instance.cancelAndReleaseAllSlots();
            }
            this.allInstances.clear();
            this.instancesWithAvailableResources.clear();
            this.taskQueue.clear();
        }
    }

    public int getNumberOfAvailableSlots() {
        int i = 0;
        synchronized (this.globalLock) {
            Iterator<Instance> it = this.instancesWithAvailableResources.iterator();
            while (it.hasNext()) {
                i += it.next().getNumberOfAvailableSlots();
            }
        }
        return i;
    }

    public int getTotalNumberOfSlots() {
        int i = 0;
        synchronized (this.globalLock) {
            for (Instance instance : this.allInstances) {
                if (instance.isAlive()) {
                    i += instance.getTotalNumberOfSlots();
                }
            }
        }
        return i;
    }

    public SimpleSlot scheduleImmediately(ScheduledUnit scheduledUnit) throws NoResourceAvailableException {
        Object scheduleTask = scheduleTask(scheduledUnit, false);
        if (scheduleTask instanceof SimpleSlot) {
            return (SimpleSlot) scheduleTask;
        }
        throw new RuntimeException();
    }

    public SlotAllocationFuture scheduleQueued(ScheduledUnit scheduledUnit) throws NoResourceAvailableException {
        Object scheduleTask = scheduleTask(scheduledUnit, true);
        if (scheduleTask instanceof SimpleSlot) {
            return new SlotAllocationFuture((SimpleSlot) scheduleTask);
        }
        if (scheduleTask instanceof SlotAllocationFuture) {
            return (SlotAllocationFuture) scheduleTask;
        }
        throw new RuntimeException();
    }

    private Object scheduleTask(ScheduledUnit scheduledUnit, boolean z) throws NoResourceAvailableException {
        SimpleSlot simpleSlot;
        if (scheduledUnit == null) {
            throw new IllegalArgumentException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Scheduling task " + scheduledUnit);
        }
        ExecutionVertex vertex = scheduledUnit.getTaskToExecute().getVertex();
        synchronized (this.globalLock) {
            SlotSharingGroup slotSharingGroup = scheduledUnit.getSlotSharingGroup();
            if (slotSharingGroup != null) {
                if (z) {
                    throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
                }
                SlotSharingGroupAssignment taskAssignment = slotSharingGroup.getTaskAssignment();
                CoLocationConstraint locationConstraint = scheduledUnit.getLocationConstraint();
                SimpleSlot slotForTask = locationConstraint == null ? taskAssignment.getSlotForTask(vertex) : taskAssignment.getSlotForTask(vertex, locationConstraint);
                Slot slot = null;
                if (slotForTask != null) {
                    try {
                        if (slotForTask.getLocality() != Locality.NON_LOCAL) {
                            updateLocalityCounters(slotForTask.getLocality());
                            return slotForTask;
                        }
                    } catch (NoResourceAvailableException e) {
                        throw e;
                    } catch (Throwable th) {
                        if (slotForTask != null) {
                            slotForTask.releaseSlot();
                        }
                        if (0 != 0) {
                            slot.releaseSlot();
                        }
                        ExceptionUtils.rethrow(th, "An error occurred while allocating a slot in a sharing group");
                    }
                }
                SimpleSlot freeSubSlotForTask = getFreeSubSlotForTask(vertex, (locationConstraint == null || locationConstraint.isUnassigned()) ? vertex.getPreferredLocations() : Collections.singleton(locationConstraint.getLocation()), taskAssignment, locationConstraint);
                if (freeSubSlotForTask == null) {
                    if (slotForTask == null) {
                        if (locationConstraint == null || locationConstraint.isUnassigned()) {
                            throw new NoResourceAvailableException(scheduledUnit, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
                        }
                        throw new NoResourceAvailableException("Could not allocate a slot on instance " + locationConstraint.getLocation() + ", as required by the co-location constraint.");
                    }
                    simpleSlot = slotForTask;
                } else if (slotForTask == null || freeSubSlotForTask.getLocality() == Locality.LOCAL) {
                    if (slotForTask != null) {
                        slotForTask.releaseSlot();
                    }
                    simpleSlot = freeSubSlotForTask;
                } else {
                    freeSubSlotForTask.releaseSlot();
                    simpleSlot = slotForTask;
                }
                if (locationConstraint != null) {
                    if (!locationConstraint.isUnassigned() && simpleSlot.getLocality() != Locality.LOCAL) {
                        throw new NoResourceAvailableException("Could not allocate a slot on instance " + locationConstraint.getLocation() + ", as required by the co-location constraint.");
                    }
                    locationConstraint.setSharedSlot(simpleSlot.getParent());
                }
                updateLocalityCounters(simpleSlot.getLocality());
                return simpleSlot;
            }
            SimpleSlot freeSlotForTask = getFreeSlotForTask(vertex, vertex.getPreferredLocations());
            if (freeSlotForTask != null) {
                updateLocalityCounters(freeSlotForTask.getLocality());
                return freeSlotForTask;
            }
            if (!z) {
                throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
            }
            SlotAllocationFuture slotAllocationFuture = new SlotAllocationFuture();
            this.taskQueue.add(new QueuedTask(scheduledUnit, slotAllocationFuture));
            return slotAllocationFuture;
        }
    }

    protected SimpleSlot getFreeSlotForTask(ExecutionVertex executionVertex, Iterable<Instance> iterable) {
        SimpleSlot allocateSimpleSlot;
        while (true) {
            Pair<Instance, Locality> findInstance = findInstance(iterable);
            if (findInstance == null) {
                return null;
            }
            Instance instance = (Instance) findInstance.getLeft();
            Locality locality = (Locality) findInstance.getRight();
            if (LOG.isDebugEnabled()) {
                if (locality == Locality.LOCAL) {
                    LOG.debug("Local assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                } else if (locality == Locality.NON_LOCAL) {
                    LOG.debug("Non-local assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                } else if (locality == Locality.UNCONSTRAINED) {
                    LOG.debug("Unconstrained assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                }
            }
            try {
                allocateSimpleSlot = instance.allocateSimpleSlot(executionVertex.getJobId(), executionVertex.getJobvertexId());
                if (instance.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.add(instance);
                }
            } catch (InstanceDiedException e) {
                this.allInstances.remove(instance);
                this.instancesWithAvailableResources.remove(instance);
            }
            if (allocateSimpleSlot != null) {
                allocateSimpleSlot.setLocality(locality);
                return allocateSimpleSlot;
            }
            continue;
        }
    }

    protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex executionVertex, Iterable<Instance> iterable, SlotSharingGroupAssignment slotSharingGroupAssignment, CoLocationConstraint coLocationConstraint) {
        AbstractID jobvertexId;
        while (true) {
            Pair<Instance, Locality> findInstance = findInstance(iterable);
            if (findInstance == null) {
                return null;
            }
            Instance instance = (Instance) findInstance.getLeft();
            Locality locality = (Locality) findInstance.getRight();
            if (LOG.isDebugEnabled()) {
                if (locality == Locality.LOCAL) {
                    LOG.debug("Local assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                } else if (locality == Locality.NON_LOCAL) {
                    LOG.debug("Non-local assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                } else if (locality == Locality.UNCONSTRAINED) {
                    LOG.debug("Unconstrained assignment: " + executionVertex.getSimpleName() + " --> " + instance);
                }
            }
            if (coLocationConstraint == null) {
                try {
                    jobvertexId = executionVertex.getJobvertexId();
                } catch (InstanceDiedException e) {
                    this.allInstances.remove(instance);
                    this.instancesWithAvailableResources.remove(instance);
                }
            } else {
                jobvertexId = coLocationConstraint.getGroupId();
            }
            AbstractID abstractID = jobvertexId;
            SimpleSlot addSharedSlotAndAllocateSubSlot = slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(instance.allocateSharedSlot(executionVertex.getJobId(), slotSharingGroupAssignment, abstractID), locality, abstractID, coLocationConstraint);
            if (instance.hasResourcesAvailable()) {
                this.instancesWithAvailableResources.add(instance);
            }
            if (addSharedSlotAndAllocateSubSlot != null) {
                return addSharedSlotAndAllocateSubSlot;
            }
        }
    }

    private Pair<Instance, Locality> findInstance(Iterable<Instance> iterable) {
        if (this.instancesWithAvailableResources.isEmpty()) {
            Instance poll = this.newlyAvailableInstances.poll();
            if (poll == null) {
                return null;
            }
            this.instancesWithAvailableResources.add(poll);
        }
        Iterator<Instance> it = iterable == null ? null : iterable.iterator();
        Instance instance = null;
        Locality locality = Locality.UNCONSTRAINED;
        if (it == null || !it.hasNext()) {
            instance = this.instancesWithAvailableResources.poll();
        } else {
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Instance next = it.next();
                if (next != null && this.instancesWithAvailableResources.remove(next)) {
                    instance = next;
                    locality = Locality.LOCAL;
                    break;
                }
            }
            if (instance == null) {
                instance = this.instancesWithAvailableResources.poll();
                locality = Locality.NON_LOCAL;
            }
        }
        return new ImmutablePair(instance, locality);
    }

    @Override // org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener
    public void newSlotAvailable(Instance instance) {
        this.newlyAvailableInstances.add(instance);
        if (this.executor != null) {
            this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.Scheduler.1
                @Override // java.lang.Runnable
                public void run() {
                    Scheduler.this.handleNewSlot();
                }
            });
        } else {
            handleNewSlot();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNewSlot() {
        synchronized (this.globalLock) {
            Instance poll = this.newlyAvailableInstances.poll();
            if (poll == null || !poll.hasResourcesAvailable()) {
                return;
            }
            QueuedTask peek = this.taskQueue.peek();
            if (peek != null) {
                ScheduledUnit task = peek.getTask();
                ExecutionVertex vertex = task.getTaskToExecute().getVertex();
                try {
                    SimpleSlot allocateSimpleSlot = poll.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
                    if (allocateSimpleSlot != null) {
                        this.taskQueue.poll();
                        if (peek.getFuture() != null) {
                            try {
                                peek.getFuture().setSlot(allocateSimpleSlot);
                            } catch (Throwable th) {
                                LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), th);
                                task.getTaskToExecute().fail(th);
                            }
                        }
                    }
                } catch (InstanceDiedException e) {
                    this.allInstances.remove(poll);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Instance " + poll + " was marked dead asynchronously.");
                    }
                }
            } else {
                this.instancesWithAvailableResources.add(poll);
            }
        }
    }

    private void updateLocalityCounters(Locality locality) {
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[locality.ordinal()]) {
            case 1:
                this.unconstrainedAssignments++;
                return;
            case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                this.localizedAssignments++;
                return;
            case 3:
                this.nonLocalizedAssignments++;
                return;
            default:
                throw new RuntimeException(locality.name());
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceListener
    public void newInstanceAvailable(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        if (instance.getNumberOfAvailableSlots() <= 0) {
            throw new IllegalArgumentException("The given instance has no resources.");
        }
        if (!instance.isAlive()) {
            throw new IllegalArgumentException("The instance is not alive.");
        }
        synchronized (this.globalLock) {
            if (!this.allInstances.add(instance)) {
                throw new IllegalArgumentException("The instance is already contained.");
            }
            try {
                instance.setSlotAvailabilityListener(this);
            } catch (IllegalStateException e) {
                this.allInstances.remove(instance);
                LOG.error("Scheduler could not attach to the instance as a listener.");
            }
            this.instancesWithAvailableResources.add(instance);
            for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
                newSlotAvailable(instance);
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceListener
    public void instanceDied(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        instance.markDead();
        synchronized (this.globalLock) {
            this.allInstances.remove(instance);
            this.instancesWithAvailableResources.remove(instance);
        }
    }

    public int getNumberOfAvailableInstances() {
        int i = 0;
        synchronized (this.globalLock) {
            Iterator<Instance> it = this.allInstances.iterator();
            while (it.hasNext()) {
                if (it.next().isAlive()) {
                    i++;
                }
            }
        }
        return i;
    }

    public int getNumberOfInstancesWithAvailableSlots() {
        return this.instancesWithAvailableResources.size();
    }

    public int getNumberOfUnconstrainedAssignments() {
        return this.unconstrainedAssignments;
    }

    public int getNumberOfLocalizedAssignments() {
        return this.localizedAssignments;
    }

    public int getNumberOfNonLocalizedAssignments() {
        return this.nonLocalizedAssignments;
    }
}
