package org.apache.tez.dag.library.vertexmanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.class */
public class ShuffleVertexManager extends VertexManagerPlugin {
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.shuffle-vertex-manager.min-src-fraction";
    public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.shuffle-vertex-manager.max-src-fraction";
    public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = "tez.shuffle-vertex-manager.enable.auto-parallel";
    public static final boolean TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = "tez.shuffle-vertex-manager.desired-task-input-size";
    public static final long TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 104857600;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = "tez.shuffle-vertex-manager.min-task-parallelism";
    public static final int TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
    private static final Log LOG = LogFactory.getLog(ShuffleVertexManager.class);
    float slowStartMinSrcCompletionFraction;
    float slowStartMaxSrcCompletionFraction;
    long desiredTaskInputDataSize;
    int minTaskParallelism;
    boolean enableAutoParallelism;
    boolean parallelismDetermined;
    int totalNumBipartiteSourceTasks;
    int numBipartiteSourceTasksCompleted;
    int numVertexManagerEventsReceived;
    List<Integer> pendingTasks;
    int totalTasksToSchedule;
    private AtomicBoolean onVertexStartedDone;
    private final Map<String, SourceVertexInfo> srcVertexInfo;
    boolean sourceVerticesScheduled;

    @VisibleForTesting
    int bipartiteSources;
    long completedSourceTasksOutputSize;
    List<VertexStateUpdate> pendingStateUpdates;

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$CustomShuffleEdgeManager.class */
    public static class CustomShuffleEdgeManager extends EdgeManagerPlugin {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;
        int numSourceTasks;

        public CustomShuffleEdgeManager(EdgeManagerPluginContext edgeManagerPluginContext) {
            super(edgeManagerPluginContext);
        }

        public void initialize() {
            UserPayload userPayload = getContext().getUserPayload();
            if (userPayload == null || userPayload.getPayload() == null || userPayload.getPayload().limit() == 0) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload");
            }
            try {
                CustomShuffleEdgeManagerConfig fromUserPayload = CustomShuffleEdgeManagerConfig.fromUserPayload(userPayload);
                this.numSourceTaskOutputs = fromUserPayload.numSourceTaskOutputs;
                this.numDestinationTasks = fromUserPayload.numDestinationTasks;
                this.basePartitionRange = fromUserPayload.basePartitionRange;
                this.remainderRangeForLastShuffler = fromUserPayload.remainderRangeForLastShuffler;
                this.numSourceTasks = getContext().getSourceVertexNumTasks();
                Preconditions.checkState(this.numDestinationTasks == getContext().getDestinationVertexNumTasks());
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload", e);
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int i) {
            return this.numSourceTasks * (i < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler);
        }

        public int getNumSourceTaskPhysicalOutputs(int i) {
            return this.numSourceTaskOutputs;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent dataMovementEvent, int i, int i2, Map<Integer, List<Integer>> map) {
            int sourceIndex = dataMovementEvent.getSourceIndex();
            int i3 = sourceIndex / this.basePartitionRange;
            int i4 = i3 < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler;
            map.put(new Integer(i3), Collections.singletonList(new Integer((i * i4) + (sourceIndex % i4))));
        }

        public void routeInputSourceTaskFailedEventToDestination(int i, Map<Integer, List<Integer>> map) {
            if (this.remainderRangeForLastShuffler >= this.basePartitionRange) {
                int i2 = i * this.basePartitionRange;
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.basePartitionRange);
                for (int i3 = 0; i3 < this.basePartitionRange; i3++) {
                    newArrayListWithCapacity.add(Integer.valueOf(i2 + i3));
                }
                List<Integer> unmodifiableList = Collections.unmodifiableList(newArrayListWithCapacity);
                for (int i4 = 0; i4 < this.numDestinationTasks; i4++) {
                    map.put(Integer.valueOf(i4), unmodifiableList);
                }
                return;
            }
            int i5 = i * this.basePartitionRange;
            ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(this.basePartitionRange);
            for (int i6 = 0; i6 < this.basePartitionRange; i6++) {
                newArrayListWithCapacity2.add(Integer.valueOf(i5 + i6));
            }
            List<Integer> unmodifiableList2 = Collections.unmodifiableList(newArrayListWithCapacity2);
            for (int i7 = 0; i7 < this.numDestinationTasks - 1; i7++) {
                map.put(Integer.valueOf(i7), unmodifiableList2);
            }
            int i8 = i * this.remainderRangeForLastShuffler;
            ArrayList newArrayListWithCapacity3 = Lists.newArrayListWithCapacity(this.remainderRangeForLastShuffler);
            for (int i9 = 0; i9 < this.remainderRangeForLastShuffler; i9++) {
                newArrayListWithCapacity3.add(Integer.valueOf(i8 + i9));
            }
            map.put(Integer.valueOf(this.numDestinationTasks - 1), Collections.unmodifiableList(newArrayListWithCapacity3));
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent inputReadErrorEvent, int i, int i2) {
            return i2 / (i < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler);
        }

        public int getNumDestinationConsumerTasks(int i) {
            return this.numDestinationTasks;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$CustomShuffleEdgeManagerConfig.class */
    public static class CustomShuffleEdgeManagerConfig {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;

        private CustomShuffleEdgeManagerConfig(int i, int i2, int i3, int i4) {
            this.numSourceTaskOutputs = i;
            this.numDestinationTasks = i2;
            this.basePartitionRange = i3;
            this.remainderRangeForLastShuffler = i4;
        }

        public UserPayload toUserPayload() {
            return UserPayload.create(ByteBuffer.wrap(ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.newBuilder().setNumSourceTaskOutputs(this.numSourceTaskOutputs).setNumDestinationTasks(this.numDestinationTasks).setBasePartitionRange(this.basePartitionRange).setRemainderRangeForLastShuffler(this.remainderRangeForLastShuffler).m180build().toByteArray()));
        }

        public static CustomShuffleEdgeManagerConfig fromUserPayload(UserPayload userPayload) throws InvalidProtocolBufferException {
            ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto parseFrom = ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.parseFrom(ByteString.copyFrom(userPayload.getPayload()));
            return new CustomShuffleEdgeManagerConfig(parseFrom.getNumSourceTaskOutputs(), parseFrom.getNumDestinationTasks(), parseFrom.getBasePartitionRange(), parseFrom.getRemainderRangeForLastShuffler());
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$ShuffleVertexManagerConfigBuilder.class */
    public static final class ShuffleVertexManagerConfigBuilder {
        private final Configuration conf;

        private ShuffleVertexManagerConfigBuilder(@Nullable Configuration configuration) {
            if (configuration == null) {
                this.conf = new Configuration(false);
            } else {
                this.conf = configuration;
            }
        }

        public ShuffleVertexManagerConfigBuilder setAutoReduceParallelism(boolean z) {
            this.conf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, z);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setSlowStartMinSrcCompletionFraction(float f) {
            this.conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, f);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setSlowStartMaxSrcCompletionFraction(float f) {
            this.conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, f);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setDesiredTaskInputSize(long j) {
            this.conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, j);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setMinTaskParallelism(int i) {
            this.conf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, i);
            return this;
        }

        public VertexManagerPluginDescriptor build() {
            try {
                return VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));
            } catch (IOException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$SourceVertexInfo.class */
    public class SourceVertexInfo {
        EdgeProperty edgeProperty;
        boolean vertexIsConfigured;
        BitSet finishedTaskSet;

        SourceVertexInfo(EdgeProperty edgeProperty) {
            this.edgeProperty = edgeProperty;
            if (edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                this.finishedTaskSet = new BitSet();
            }
        }
    }

    public ShuffleVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
        this.desiredTaskInputDataSize = TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT;
        this.minTaskParallelism = 1;
        this.enableAutoParallelism = false;
        this.parallelismDetermined = false;
        this.totalNumBipartiteSourceTasks = 0;
        this.numBipartiteSourceTasksCompleted = 0;
        this.numVertexManagerEventsReceived = 0;
        this.pendingTasks = Lists.newLinkedList();
        this.totalTasksToSchedule = 0;
        this.onVertexStartedDone = new AtomicBoolean(false);
        this.srcVertexInfo = Maps.newConcurrentMap();
        this.sourceVerticesScheduled = false;
        this.bipartiteSources = 0;
        this.completedSourceTasksOutputSize = 0L;
        this.pendingStateUpdates = Lists.newArrayList();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void onVertexStarted(Map<String, List<Integer>> map) {
        for (Map.Entry entry : getContext().getInputVertexEdgeProperties().entrySet()) {
            this.srcVertexInfo.put(entry.getKey(), new SourceVertexInfo((EdgeProperty) entry.getValue()));
            getContext().registerForVertexStateUpdates((String) entry.getKey(), EnumSet.of(VertexState.CONFIGURED));
            if (((EdgeProperty) entry.getValue()).getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                this.bipartiteSources++;
            }
        }
        if (this.bipartiteSources == 0) {
            throw new TezUncheckedException("Atleast 1 bipartite source should exist");
        }
        Iterator<VertexStateUpdate> it = this.pendingStateUpdates.iterator();
        while (it.hasNext()) {
            handleVertexStateUpdate(it.next());
        }
        this.pendingStateUpdates.clear();
        updatePendingTasks();
        updateSourceTaskCount();
        LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() + " with " + this.totalNumBipartiteSourceTasks + " source tasks and " + this.totalTasksToSchedule + " pending tasks");
        if (map != null) {
            for (Map.Entry<String, List<Integer>> entry2 : map.entrySet()) {
                Iterator<Integer> it2 = entry2.getValue().iterator();
                while (it2.hasNext()) {
                    onSourceTaskCompleted(entry2.getKey(), it2.next());
                }
            }
        }
        this.onVertexStartedDone.set(true);
        schedulePendingTasks();
    }

    public synchronized void onSourceTaskCompleted(String str, Integer num) {
        BitSet bitSet;
        updateSourceTaskCount();
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(str);
        if (sourceVertexInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER && (bitSet = sourceVertexInfo.finishedTaskSet) != null && !bitSet.get(num.intValue())) {
            bitSet.set(num.intValue());
            this.numBipartiteSourceTasksCompleted++;
        }
        schedulePendingTasks();
    }

    public synchronized void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
        if (this.enableAutoParallelism) {
            try {
                long outputSize = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vertexManagerEvent.getUserPayload())).getOutputSize();
                this.numVertexManagerEventsReceived++;
                this.completedSourceTasksOutputSize += outputSize;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received info of output size: " + outputSize + " numInfoReceived: " + this.numVertexManagerEventsReceived + " total output size: " + this.completedSourceTasksOutputSize);
                }
            } catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    void updatePendingTasks() {
        this.pendingTasks.clear();
        for (int i = 0; i < getContext().getVertexNumTasks(getContext().getVertexName()); i++) {
            this.pendingTasks.add(new Integer(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
        return Iterables.filter(this.srcVertexInfo.entrySet(), new Predicate<Map.Entry<String, SourceVertexInfo>>() { // from class: org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager.1
            public boolean apply(Map.Entry<String, SourceVertexInfo> entry) {
                return entry.getValue().edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER;
            }
        });
    }

    void updateSourceTaskCount() {
        int i = 0;
        Iterator<Map.Entry<String, SourceVertexInfo>> it = getBipartiteInfo().iterator();
        while (it.hasNext()) {
            i += getContext().getVertexNumTasks(it.next().getKey());
        }
        this.totalNumBipartiteSourceTasks = i;
    }

    @VisibleForTesting
    boolean determineParallelismAndApply() {
        if (this.numBipartiteSourceTasksCompleted == 0 || this.numVertexManagerEventsReceived == 0) {
            return true;
        }
        int size = this.pendingTasks.size();
        if (this.completedSourceTasksOutputSize < this.desiredTaskInputDataSize && ((float) this.numBipartiteSourceTasksCompleted) < ((float) this.totalNumBipartiteSourceTasks) * this.slowStartMaxSrcCompletionFraction) {
            LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName() + ", totalNumBipartiteSourceTasks=" + this.totalNumBipartiteSourceTasks + ", completedSourceTasksOutputSize=" + this.completedSourceTasksOutputSize + ", numVertexManagerEventsReceived=" + this.numVertexManagerEventsReceived + ", numBipartiteSourceTasksCompleted=" + this.numBipartiteSourceTasksCompleted + ", maxThreshold=" + (this.totalNumBipartiteSourceTasks * this.slowStartMaxSrcCompletionFraction));
            return false;
        }
        long j = (this.totalNumBipartiteSourceTasks * this.completedSourceTasksOutputSize) / this.numVertexManagerEventsReceived;
        int i = (int) (((j + this.desiredTaskInputDataSize) - 1) / this.desiredTaskInputDataSize);
        if (i < this.minTaskParallelism) {
            i = this.minTaskParallelism;
        }
        if (i >= size) {
            return true;
        }
        int i2 = size / i;
        if (i2 <= 1) {
            return true;
        }
        int i3 = size / i2;
        int i4 = size % i2;
        int i5 = i4 > 0 ? i3 + 1 : i3;
        LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName() + " to " + i5 + " from " + this.pendingTasks.size() + " . Expected output: " + j + " based on actual output: " + this.completedSourceTasksOutputSize + " from " + this.numVertexManagerEventsReceived + " vertex manager events.  desiredTaskInputSize: " + this.desiredTaskInputDataSize + " max slow start tasks:" + (this.totalNumBipartiteSourceTasks * this.slowStartMaxSrcCompletionFraction) + " num sources completed:" + this.numBipartiteSourceTasksCompleted);
        if (i5 >= size) {
            return true;
        }
        HashMap hashMap = new HashMap(this.bipartiteSources);
        Iterator<Map.Entry<String, SourceVertexInfo>> it = getBipartiteInfo().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            CustomShuffleEdgeManagerConfig customShuffleEdgeManagerConfig = new CustomShuffleEdgeManagerConfig(size, i5, i2, i4 > 0 ? i4 : i2);
            EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
            create.setUserPayload(customShuffleEdgeManagerConfig.toUserPayload());
            hashMap.put(key, create);
        }
        getContext().setVertexParallelism(i5, (VertexLocationHint) null, hashMap, (Map) null);
        updatePendingTasks();
        return true;
    }

    void schedulePendingTasks(int i) {
        if (this.enableAutoParallelism && !this.parallelismDetermined) {
            this.parallelismDetermined = determineParallelismAndApply();
            if (!this.parallelismDetermined) {
                return;
            } else {
                getContext().doneReconfiguringVertex();
            }
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        while (!this.pendingTasks.isEmpty() && i > 0) {
            i--;
            newArrayListWithCapacity.add(new VertexManagerPluginContext.TaskWithLocationHint(this.pendingTasks.get(0), (TaskLocationHint) null));
            this.pendingTasks.remove(0);
        }
        getContext().scheduleVertexTasks(newArrayListWithCapacity);
        if (this.pendingTasks.size() == 0) {
        }
    }

    boolean canScheduleTasks() {
        for (Map.Entry<String, SourceVertexInfo> entry : this.srcVertexInfo.entrySet()) {
            if (getContext().getVertexNumTasks(entry.getKey()) > 0 && !entry.getValue().vertexIsConfigured) {
                if (!LOG.isDebugEnabled()) {
                    return false;
                }
                LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: " + getContext().getVertexName());
                return false;
            }
        }
        this.sourceVerticesScheduled = true;
        return this.sourceVerticesScheduled;
    }

    void schedulePendingTasks() {
        int size;
        if (this.onVertexStartedDone.get() && (size = this.pendingTasks.size()) != 0) {
            if (!this.sourceVerticesScheduled && !canScheduleTasks()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Defer scheduling tasks for vertex:" + getContext().getVertexName() + " as one task needs to be completed per source vertex");
                    return;
                }
                return;
            }
            if (this.numBipartiteSourceTasksCompleted == this.totalNumBipartiteSourceTasks && size > 0) {
                LOG.info("All source tasks assigned. Ramping up " + size + " remaining tasks for vertex: " + getContext().getVertexName());
                schedulePendingTasks(size);
                return;
            }
            float f = this.totalNumBipartiteSourceTasks != 0 ? this.numBipartiteSourceTasksCompleted / this.totalNumBipartiteSourceTasks : 1.0f;
            float f2 = 1.0f;
            float f3 = this.slowStartMaxSrcCompletionFraction - this.slowStartMinSrcCompletionFraction;
            if (f3 > TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT) {
                f2 = (f - this.slowStartMinSrcCompletionFraction) / f3;
            } else if (f < this.slowStartMinSrcCompletionFraction) {
                f2 = 0.0f;
            }
            int max = ((int) (Math.max(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT, Math.min(1.0f, f2)) * this.totalTasksToSchedule)) - (this.totalTasksToSchedule - size);
            if (max > 0) {
                LOG.info("Scheduling " + max + " tasks for vertex: " + getContext().getVertexName() + " with totalTasks: " + this.totalTasksToSchedule + ". " + this.numBipartiteSourceTasksCompleted + " source tasks completed out of " + this.totalNumBipartiteSourceTasks + ". SourceTaskCompletedFraction: " + f + " min: " + this.slowStartMinSrcCompletionFraction + " max: " + this.slowStartMaxSrcCompletionFraction);
                schedulePendingTasks(max);
            }
        }
    }

    public void initialize() {
        try {
            Configuration createConfFromUserPayload = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
            this.slowStartMinSrcCompletionFraction = createConfFromUserPayload.getFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
            this.slowStartMaxSrcCompletionFraction = createConfFromUserPayload.getFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0.75f);
            if (this.slowStartMinSrcCompletionFraction < TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT || this.slowStartMaxSrcCompletionFraction < this.slowStartMinSrcCompletionFraction) {
                throw new IllegalArgumentException("Invalid values for slowStartMinSrcCompletionFraction/slowStartMaxSrcCompletionFraction. Min cannot be < 0 and max cannot be < min.");
            }
            this.enableAutoParallelism = createConfFromUserPayload.getBoolean(TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, false);
            this.desiredTaskInputDataSize = createConfFromUserPayload.getLong(TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
            this.minTaskParallelism = Math.max(1, createConfFromUserPayload.getInt(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, 1));
            LOG.info("Shuffle Vertex Manager: settings minFrac:" + this.slowStartMinSrcCompletionFraction + " maxFrac:" + this.slowStartMaxSrcCompletionFraction + " auto:" + this.enableAutoParallelism + " desiredTaskIput:" + this.desiredTaskInputDataSize + " minTasks:" + this.minTaskParallelism);
            if (this.enableAutoParallelism) {
                getContext().vertexReconfigurationPlanned();
            }
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    private void handleVertexStateUpdate(VertexStateUpdate vertexStateUpdate) {
        Preconditions.checkArgument(vertexStateUpdate.getVertexState() == VertexState.CONFIGURED, "Received incorrect state notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        Preconditions.checkArgument(this.srcVertexInfo.containsKey(vertexStateUpdate.getVertexName()), "Received incorrect vertex notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(vertexStateUpdate.getVertexName());
        Preconditions.checkState(!sourceVertexInfo.vertexIsConfigured);
        sourceVertexInfo.vertexIsConfigured = true;
        LOG.info("Received configured notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        schedulePendingTasks();
    }

    public synchronized void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        if (vertexStateUpdate.getVertexState() == VertexState.CONFIGURED) {
            if (this.onVertexStartedDone.get()) {
                handleVertexStateUpdate(vertexStateUpdate);
            } else {
                this.pendingStateUpdates.add(vertexStateUpdate);
            }
        }
    }

    public synchronized void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
    }

    public static ShuffleVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration configuration) {
        return new ShuffleVertexManagerConfigBuilder(configuration);
    }
}
