package org.apache.tez.dag.app.dag.impl;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.records.TezTaskID;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.class */
public class DAGSchedulerMRR implements DAGScheduler {
    private static final Log LOG;
    private final DAG dag;
    private final TaskSchedulerEventHandler taskScheduler;
    private final EventHandler handler;
    private final float minReservedShuffleResource;
    private Vertex currentPartitioner = null;
    private Vertex currentShuffler = null;
    private int currentShufflerDepth = 0;
    int numShuffleTasksScheduled = 0;
    List<TaskAttempt> pendingShuffleTasks = new LinkedList();
    Set<TezTaskID> unassignedShuffleTasks = new HashSet();
    Resource realShufflerResource = null;
    Set<TezTaskID> unassignedPartitionTasks = new HashSet();
    Resource realPartitionerResource = null;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DAGSchedulerMRR(DAG dag, EventHandler eventHandler, TaskSchedulerEventHandler taskSchedulerEventHandler, float f) {
        this.dag = dag;
        this.handler = eventHandler;
        this.taskScheduler = taskSchedulerEventHandler;
        this.minReservedShuffleResource = f;
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void vertexCompleted(Vertex vertex) {
        if (this.currentPartitioner != null) {
            if (vertex != this.currentPartitioner) {
                String str = vertex.getVertexId() + " finished. Expecting current partitioner " + this.currentPartitioner.getVertexId() + " to finish.";
                LOG.fatal(str);
                throw new TezUncheckedException(str);
            }
            LOG.info("Current partitioner " + this.currentPartitioner.getVertexId() + " is completed. " + (this.currentShuffler != null ? this.currentShuffler.getVertexId() + " is new partitioner" : "No current shuffler to replace the partitioner"));
            this.currentPartitioner = this.currentShuffler;
            if (!$assertionsDisabled && !this.unassignedPartitionTasks.isEmpty()) {
                throw new AssertionError();
            }
            this.unassignedPartitionTasks.addAll(this.unassignedShuffleTasks);
            this.unassignedShuffleTasks.clear();
            this.realPartitionerResource = this.realShufflerResource;
            this.realShufflerResource = null;
            this.currentShuffler = null;
            schedulePendingShuffles(this.pendingShuffleTasks.size());
            if (!$assertionsDisabled && !this.pendingShuffleTasks.isEmpty()) {
                throw new AssertionError();
            }
            this.numShuffleTasksScheduled = 0;
        }
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void scheduleTask(DAGEventSchedulerUpdate dAGEventSchedulerUpdate) {
        TaskAttempt attempt = dAGEventSchedulerUpdate.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        int distanceFromRoot = vertex.getDistanceFromRoot();
        LOG.info("Schedule task: " + attempt.getID());
        if (this.currentPartitioner == null) {
            this.currentPartitioner = vertex;
            this.currentShufflerDepth = distanceFromRoot;
            if (!$assertionsDisabled && this.realPartitionerResource != null) {
                throw new AssertionError();
            }
            Resource taskResource = this.currentPartitioner.getTaskResource();
            this.realPartitionerResource = Resource.newInstance(taskResource.getMemory(), taskResource.getVirtualCores());
            LOG.info(vertex.getVertexId() + " is new partitioner at depth " + distanceFromRoot);
        } else if (this.currentShuffler == null && distanceFromRoot > this.currentShufflerDepth) {
            this.currentShuffler = vertex;
            this.currentShufflerDepth = distanceFromRoot;
            if (!$assertionsDisabled && this.realShufflerResource != null) {
                throw new AssertionError();
            }
            Resource taskResource2 = this.currentShuffler.getTaskResource();
            this.realShufflerResource = Resource.newInstance(taskResource2.getMemory(), taskResource2.getVirtualCores());
            LOG.info(vertex.getVertexId() + " is new shuffler at depth " + this.currentShufflerDepth);
        }
        if (this.currentShuffler == vertex) {
            this.pendingShuffleTasks.add(attempt);
            this.unassignedShuffleTasks.add(attempt.getTaskID());
            schedulePendingShuffles(getNumShufflesToSchedule());
            return;
        }
        if (this.currentPartitioner == vertex) {
            this.unassignedPartitionTasks.add(attempt.getTaskID());
        }
        if (this.currentPartitioner == vertex || this.currentShuffler == vertex || distanceFromRoot < this.currentPartitioner.getDistanceFromRoot()) {
            scheduleTaskAttempt(attempt);
        } else {
            String str = vertex.getVertexId() + " is neither the  current partitioner: " + this.currentPartitioner.getVertexId() + " nor the current shuffler: " + this.currentShuffler.getVertexId();
            LOG.fatal(str);
            throw new TezUncheckedException(str);
        }
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void taskScheduled(DAGEventSchedulerUpdateTAAssigned dAGEventSchedulerUpdateTAAssigned) {
        TaskAttempt attempt = dAGEventSchedulerUpdateTAAssigned.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        LOG.info("Task assigned: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() + " succeeded: " + vertex.getSucceededTasks() + " Resource: " + dAGEventSchedulerUpdateTAAssigned.getContainer().getResource().getMemory());
        if (this.currentPartitioner == vertex) {
            this.unassignedPartitionTasks.remove(attempt.getTaskID());
            Resource resource = dAGEventSchedulerUpdateTAAssigned.getContainer().getResource();
            if (resource.getMemory() > this.realPartitionerResource.getMemory()) {
                this.realPartitionerResource.setMemory(resource.getMemory());
            }
        } else if (this.currentShuffler == vertex) {
            this.unassignedShuffleTasks.remove(attempt.getTaskID());
            Resource resource2 = dAGEventSchedulerUpdateTAAssigned.getContainer().getResource();
            if (resource2.getMemory() > this.realShufflerResource.getMemory()) {
                this.realShufflerResource.setMemory(resource2.getMemory());
            }
        }
        schedulePendingShuffles(getNumShufflesToSchedule());
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void taskSucceeded(DAGEventSchedulerUpdate dAGEventSchedulerUpdate) {
        TaskAttempt attempt = dAGEventSchedulerUpdate.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        LOG.info("Task succeeded: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() + " succeeded: " + vertex.getSucceededTasks());
        schedulePendingShuffles(getNumShufflesToSchedule());
    }

    int getNumShufflesToSchedule() {
        if (!$assertionsDisabled && this.currentPartitioner == null) {
            throw new AssertionError();
        }
        if (this.pendingShuffleTasks.isEmpty()) {
            return 0;
        }
        if (this.unassignedPartitionTasks.isEmpty()) {
            LOG.info("All partitioners assigned. Scheduling all shufflers.");
            return this.pendingShuffleTasks.size();
        }
        if (!$assertionsDisabled && this.currentShuffler == null) {
            throw new AssertionError();
        }
        Resource totalResources = this.taskScheduler.getTotalResources();
        Resource availableResources = this.taskScheduler.getAvailableResources();
        int memory = totalResources.getMemory();
        int memory2 = availableResources.getMemory();
        int memory3 = this.realPartitionerResource.getMemory();
        int memory4 = this.realShufflerResource.getMemory();
        int i = memory4 * this.numShuffleTasksScheduled;
        int totalTasks = this.currentPartitioner.getTotalTasks();
        int succeededTasks = this.currentPartitioner.getSucceededTasks();
        int i2 = (totalTasks - succeededTasks) * memory3;
        int i3 = memory - i2;
        int min = (int) (memory * Math.min(this.minReservedShuffleResource, succeededTasks / totalTasks));
        if (i3 < min) {
            i3 = min;
        }
        int i4 = i3 - i;
        LOG.info("TotalMem: " + memory + " Headroom: " + memory2 + " PartitionerTaskMem: " + memory3 + " ShufflerTaskMem: " + memory4 + " MaxShuffleMem: " + min + " PartitionerMemNeeded:" + i2 + " ShufflerMemAssigned: " + i + " ShufflerMemLeft: " + i4 + " Pending shufflers: " + this.pendingShuffleTasks.size());
        if (i4 < 0) {
            return 0;
        }
        if (memory4 == 0) {
            return this.pendingShuffleTasks.size();
        }
        int i5 = i4 / memory4;
        if (memory - (i + (memory4 * i5)) >= memory3) {
            return i5;
        }
        LOG.info("Not scheduling more shufflers as it starves partitioners");
        return 0;
    }

    void schedulePendingShuffles(int i) {
        while (!this.pendingShuffleTasks.isEmpty() && i > 0) {
            i--;
            TaskAttempt remove = this.pendingShuffleTasks.remove(0);
            scheduleTaskAttempt(remove);
            if (!remove.getIsRescheduled()) {
                this.numShuffleTasksScheduled++;
            }
        }
    }

    void scheduleTaskAttempt(TaskAttempt taskAttempt) {
        boolean z = false;
        Vertex vertex = this.dag.getVertex(taskAttempt.getVertexID());
        int distanceFromRoot = vertex.getDistanceFromRoot();
        int i = (distanceFromRoot + 1) * 3;
        if (this.currentShuffler == vertex) {
            if (!$assertionsDisabled && this.currentPartitioner == null) {
                throw new AssertionError();
            }
            if (!this.unassignedPartitionTasks.isEmpty()) {
                z = true;
            }
        }
        if (z) {
            i -= 4;
        } else if (taskAttempt.getIsRescheduled()) {
            i -= 2;
        }
        LOG.info("Scheduling " + taskAttempt.getID() + " with depth " + distanceFromRoot + " at priority " + i);
        sendEvent(new TaskAttemptEventSchedule(taskAttempt.getID(), Priority.newInstance(i)));
    }

    void sendEvent(TaskAttemptEventSchedule taskAttemptEventSchedule) {
        this.handler.handle(taskAttemptEventSchedule);
    }

    static {
        $assertionsDisabled = !DAGSchedulerMRR.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(DAGSchedulerMRR.class);
    }
}
