/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.DistributionPatternProvider;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGate;
import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
import org.apache.flink.runtime.executiongraph.ExecutionGroupEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
import org.apache.flink.runtime.executiongraph.ExecutionSignature;
import org.apache.flink.runtime.executiongraph.ExecutionStage;
import org.apache.flink.runtime.executiongraph.ExecutionStageListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.executiongraph.GraphConversionException;
import org.apache.flink.runtime.executiongraph.InternalJobStatus;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.AllocatedResource;
import org.apache.flink.runtime.instance.DummyInstance;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.jobgraph.AbstractJobInputVertex;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobInputVertex;
import org.apache.flink.runtime.jobgraph.JobOutputVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
import org.apache.flink.util.StringUtils;

public class ExecutionGraph
implements ExecutionListener {
    private static final Log LOG = LogFactory.getLog(ExecutionGraph.class);
    private final JobID jobID;
    private final String jobName;
    private final ConcurrentMap<ExecutionVertexID, ExecutionVertex> vertexMap = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>(1024);
    private final ConcurrentMap<ChannelID, ExecutionEdge> edgeMap = new ConcurrentHashMap<ChannelID, ExecutionEdge>(0x100000);
    private final CopyOnWriteArrayList<ExecutionStage> stages = new CopyOnWriteArrayList();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(ExecutorThreadFactory.INSTANCE);
    private volatile int indexToCurrentExecutionStage = 0;
    private final Configuration jobConfiguration;
    private final AtomicReference<InternalJobStatus> jobStatus = new AtomicReference<InternalJobStatus>(InternalJobStatus.CREATED);
    private volatile String errorDescription = null;
    private final CopyOnWriteArrayList<JobStatusListener> jobStatusListeners = new CopyOnWriteArrayList();
    private final CopyOnWriteArrayList<ExecutionStageListener> executionStageListeners = new CopyOnWriteArrayList();

    private ExecutionGraph(JobID jobID, String jobName, Configuration jobConfiguration) {
        if (jobID == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        this.jobID = jobID;
        this.jobName = jobName;
        this.jobConfiguration = jobConfiguration;
    }

    public ExecutionGraph(JobGraph job, int defaultParallelism) throws GraphConversionException {
        this(job.getJobID(), job.getName(), job.getJobConfiguration());
        try {
            this.constructExecutionGraph(job, defaultParallelism);
        }
        catch (GraphConversionException e) {
            throw e;
        }
        catch (Exception e) {
            throw new GraphConversionException(StringUtils.stringifyException((Throwable)e));
        }
    }

    private void applyUserDefinedSettings(HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap) throws GraphConversionException {
        ExecutionGroupVertex groupVertex;
        for (Map.Entry<AbstractJobVertex, ExecutionGroupVertex> entry : temporaryGroupVertexMap.entrySet()) {
            AbstractJobVertex jobVertex = entry.getKey();
            if (jobVertex.getVertexToShareInstancesWith() == null) continue;
            AbstractJobVertex vertexToShareInstancesWith = jobVertex.getVertexToShareInstancesWith();
            ExecutionGroupVertex groupVertex2 = entry.getValue();
            ExecutionGroupVertex groupVertexToShareInstancesWith = temporaryGroupVertexMap.get(vertexToShareInstancesWith);
            groupVertex2.shareInstancesWith(groupVertexToShareInstancesWith);
        }
        ExecutionGroupVertexIterator it2 = new ExecutionGroupVertexIterator(this, true, -1);
        while (it2.hasNext()) {
            groupVertex = (ExecutionGroupVertex)it2.next();
            if (!groupVertex.isNumberOfMembersUserDefined()) continue;
            groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers());
        }
        it2 = new ExecutionGroupVertexIterator(this, true, -1);
        while (it2.hasNext()) {
            groupVertex = (ExecutionGroupVertex)it2.next();
            for (int i = 0; i < groupVertex.getNumberOfForwardLinks(); ++i) {
                ExecutionGroupEdge edge = groupVertex.getForwardEdge(i);
                if (edge.isChannelTypeUserDefined()) {
                    edge.changeChannelType(edge.getChannelType());
                }
                this.createExecutionEdgesForGroupEdge(edge);
            }
        }
        this.repairInstanceAssignment();
        this.repairInstanceSharing();
        this.repairStages();
    }

    private void constructExecutionGraph(JobGraph jobGraph, int defaultParallelism) throws GraphConversionException {
        HashMap<AbstractJobVertex, ExecutionVertex> temporaryVertexMap = new HashMap<AbstractJobVertex, ExecutionVertex>();
        HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap = new HashMap<AbstractJobVertex, ExecutionGroupVertex>();
        ExecutionStage initialExecutionStage = new ExecutionStage(this, 0);
        this.stages.add(initialExecutionStage);
        AbstractJobVertex[] all = jobGraph.getAllJobVertices();
        for (int i = 0; i < all.length; ++i) {
            if (all[i].getNumberOfSubtasks() == -1) {
                all[i].setNumberOfSubtasks(defaultParallelism);
            }
            ExecutionVertex createdVertex = this.createVertex(all[i], initialExecutionStage);
            temporaryVertexMap.put(all[i], createdVertex);
            temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex());
        }
        this.createInitialGroupEdges(temporaryVertexMap);
        this.applyUserDefinedSettings(temporaryGroupVertexMap);
        this.calculateConnectionIDs();
        this.reconstructExecutionPipelines();
    }

    private void createExecutionEdgesForGroupEdge(ExecutionGroupEdge groupEdge) {
        ExecutionGroupVertex source = groupEdge.getSourceVertex();
        int indexOfOutputGate = groupEdge.getIndexOfOutputGate();
        ExecutionGroupVertex target = groupEdge.getTargetVertex();
        int indexOfInputGate = groupEdge.getIndexOfInputGate();
        HashMap<GateID, ArrayList<ExecutionEdge>> inputChannelMap = new HashMap<GateID, ArrayList<ExecutionEdge>>();
        int currentNumberOfSourceNodes = source.getCurrentNumberOfGroupMembers();
        for (int i = 0; i < currentNumberOfSourceNodes; ++i) {
            ExecutionVertex sourceVertex = source.getGroupMember(i);
            ExecutionGate outputGate = sourceVertex.getOutputGate(indexOfOutputGate);
            if (outputGate == null) {
                throw new IllegalStateException("wire: " + sourceVertex.getName() + " has no output gate with index " + indexOfOutputGate);
            }
            if (outputGate.getNumberOfEdges() > 0) {
                throw new IllegalStateException("wire: wire called on source " + sourceVertex.getName() + " (" + i + "), but number of output channels is " + outputGate.getNumberOfEdges() + "!");
            }
            int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
            ArrayList<ExecutionEdge> outputChannels = new ArrayList<ExecutionEdge>();
            for (int j = 0; j < currentNumberOfTargetNodes; ++j) {
                ExecutionVertex targetVertex = target.getGroupMember(j);
                ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
                if (inputGate == null) {
                    throw new IllegalStateException("wire: " + targetVertex.getName() + " has no input gate with index " + indexOfInputGate);
                }
                if (inputGate.getNumberOfEdges() > 0 && i == 0) {
                    throw new IllegalStateException("wire: wire called on target " + targetVertex.getName() + " (" + j + "), but number of input channels is " + inputGate.getNumberOfEdges() + "!");
                }
                if (!DistributionPatternProvider.createWire(groupEdge.getDistributionPattern(), i, j, currentNumberOfSourceNodes, currentNumberOfTargetNodes)) continue;
                ChannelID outputChannelID = new ChannelID();
                ChannelID inputChannelID = new ChannelID();
                ExecutionEdge edge = new ExecutionEdge(outputGate, inputGate, groupEdge, outputChannelID, inputChannelID, outputGate.getNumberOfEdges(), inputGate.getNumberOfEdges());
                this.edgeMap.put(outputChannelID, edge);
                this.edgeMap.put(inputChannelID, edge);
                outputChannels.add(edge);
                ArrayList<ExecutionEdge> inputChannels = (ArrayList<ExecutionEdge>)inputChannelMap.get(inputGate.getGateID());
                if (inputChannels == null) {
                    inputChannels = new ArrayList<ExecutionEdge>();
                    inputChannelMap.put(inputGate.getGateID(), inputChannels);
                }
                inputChannels.add(edge);
            }
            outputGate.replaceAllEdges(outputChannels);
        }
        int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
        for (int i = 0; i < currentNumberOfTargetNodes; ++i) {
            ExecutionVertex targetVertex = target.getGroupMember(i);
            ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
            List inputChannels = (List)inputChannelMap.get(inputGate.getGateID());
            if (inputChannels == null) {
                LOG.error((Object)("Cannot find input channels for gate ID " + inputGate.getGateID()));
                continue;
            }
            inputGate.replaceAllEdges(inputChannels);
        }
    }

    private void createInitialGroupEdges(HashMap<AbstractJobVertex, ExecutionVertex> vertexMap) throws GraphConversionException {
        for (Map.Entry<AbstractJobVertex, ExecutionVertex> entry : vertexMap.entrySet()) {
            AbstractJobVertex sjv = entry.getKey();
            ExecutionVertex sev = entry.getValue();
            ExecutionGroupVertex sgv = sev.getGroupVertex();
            for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
                JobEdge edge = sjv.getForwardConnection(i);
                AbstractJobVertex tjv = edge.getConnectedVertex();
                ExecutionVertex tev = vertexMap.get(tjv);
                ExecutionGroupVertex tgv = tev.getGroupVertex();
                ChannelType channelType = edge.getChannelType();
                boolean userDefinedChannelType = true;
                if (channelType == null) {
                    userDefinedChannelType = false;
                    channelType = ChannelType.NETWORK;
                }
                DistributionPattern distributionPattern = edge.getDistributionPattern();
                ExecutionGroupEdge groupEdge = sgv.wireTo(tgv, edge.getIndexOfInputGate(), i, channelType, userDefinedChannelType, distributionPattern);
                ExecutionGate outputGate = new ExecutionGate(new GateID(), sev, groupEdge, false);
                sev.insertOutputGate(i, outputGate);
                ExecutionGate inputGate = new ExecutionGate(new GateID(), tev, groupEdge, true);
                tev.insertInputGate(edge.getIndexOfInputGate(), inputGate);
            }
        }
    }

    private ExecutionVertex createVertex(AbstractJobVertex jobVertex, ExecutionStage initialExecutionStage) throws GraphConversionException {
        ClassLoader cl;
        Class<? extends AbstractInvokable> invokableClass = jobVertex.getInvokableClass();
        if (invokableClass == null) {
            throw new GraphConversionException("JobVertex " + jobVertex.getID() + " (" + jobVertex.getName() + ") does not specify a task");
        }
        ExecutionSignature signature = ExecutionSignature.createSignature(jobVertex.getInvokableClass(), jobVertex.getJobGraph().getJobID());
        ExecutionGroupVertex groupVertex = null;
        try {
            groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this, jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature, invokableClass);
        }
        catch (Throwable t) {
            throw new GraphConversionException(t);
        }
        if (jobVertex instanceof AbstractJobInputVertex) {
            InputSplit[] inputSplits;
            AbstractJobInputVertex jobInputVertex = (AbstractJobInputVertex)jobVertex;
            if (jobVertex instanceof JobInputVertex) {
                try {
                    cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
                    ((JobInputVertex)jobVertex).initializeInputFormatFromTaskConfig(cl);
                }
                catch (Throwable t) {
                    throw new GraphConversionException("Could not deserialize input format.", t);
                }
            }
            Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
            try {
                inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
            }
            catch (Throwable t) {
                throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), t);
            }
            if (inputSplits == null) {
                inputSplits = new InputSplit[]{};
            }
            LOG.info((Object)("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length + " input splits"));
            groupVertex.setInputSplits(inputSplits);
            groupVertex.setInputSplitType(inputSplitType);
        }
        if (jobVertex instanceof JobOutputVertex) {
            JobOutputVertex jobOutputVertex = (JobOutputVertex)jobVertex;
            try {
                cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
                jobOutputVertex.initializeOutputFormatFromTaskConfig(cl);
            }
            catch (Throwable t) {
                throw new GraphConversionException("Could not deserialize output format.", t);
            }
            OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
            if (outputFormat != null && outputFormat instanceof InitializeOnMaster) {
                try {
                    ((InitializeOnMaster)outputFormat).initializeGlobal(jobVertex.getNumberOfSubtasks());
                }
                catch (Throwable t) {
                    throw new GraphConversionException(t);
                }
            }
        }
        initialExecutionStage.addStageMember(groupVertex);
        ExecutionVertex ev = new ExecutionVertex(this, groupVertex, jobVertex.getNumberOfForwardConnections(), jobVertex.getNumberOfBackwardConnections());
        ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null));
        return ev;
    }

    public int getNumberOfInputVertices() {
        return this.stages.get(0).getNumberOfInputExecutionVertices();
    }

    public int getNumberOfInputVertices(int stage) {
        if (stage >= this.stages.size()) {
            return 0;
        }
        return this.stages.get(stage).getNumberOfInputExecutionVertices();
    }

    public int getNumberOfOutputVertices() {
        return this.stages.get(0).getNumberOfOutputExecutionVertices();
    }

    public int getNumberOfOutputVertices(int stage) {
        if (stage >= this.stages.size()) {
            return 0;
        }
        return this.stages.get(stage).getNumberOfOutputExecutionVertices();
    }

    public ExecutionVertex getInputVertex(int index) {
        return this.stages.get(0).getInputExecutionVertex(index);
    }

    public ExecutionVertex getOutputVertex(int index) {
        return this.stages.get(0).getOutputExecutionVertex(index);
    }

    public ExecutionVertex getInputVertex(int stage, int index) {
        try {
            ExecutionStage s = this.stages.get(stage);
            if (s == null) {
                return null;
            }
            return s.getInputExecutionVertex(index);
        }
        catch (ArrayIndexOutOfBoundsException e) {
            return null;
        }
    }

    public ExecutionVertex getOutputVertex(int stage, int index) {
        try {
            ExecutionStage s = this.stages.get(stage);
            if (s == null) {
                return null;
            }
            return s.getOutputExecutionVertex(index);
        }
        catch (ArrayIndexOutOfBoundsException e) {
            return null;
        }
    }

    public ExecutionStage getStage(int num) {
        try {
            return this.stages.get(num);
        }
        catch (ArrayIndexOutOfBoundsException e) {
            return null;
        }
    }

    public int getNumberOfStages() {
        return this.stages.size();
    }

    public ExecutionVertex getVertexByChannelID(ChannelID id) {
        ExecutionEdge edge = (ExecutionEdge)this.edgeMap.get(id);
        if (edge == null) {
            return null;
        }
        if (id.equals(edge.getOutputChannelID())) {
            return edge.getOutputGate().getVertex();
        }
        return edge.getInputGate().getVertex();
    }

    public ExecutionEdge getEdgeByID(ChannelID id) {
        return (ExecutionEdge)this.edgeMap.get(id);
    }

    void registerExecutionVertex(ExecutionVertex vertex) {
        if (this.vertexMap.put(vertex.getID(), vertex) != null) {
            throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID() + " registered");
        }
    }

    public ExecutionVertex getVertexByID(ExecutionVertexID id) {
        return (ExecutionVertex)this.vertexMap.get(id);
    }

    private boolean isCurrentStageCompleted() {
        if (this.indexToCurrentExecutionStage >= this.stages.size()) {
            return true;
        }
        ExecutionGraphIterator it = new ExecutionGraphIterator(this, this.indexToCurrentExecutionStage, true, true);
        while (it.hasNext()) {
            ExecutionVertex vertex = it.next();
            if (vertex.getExecutionState() == ExecutionState.FINISHED) continue;
            return false;
        }
        return true;
    }

    public boolean isExecutionFinished() {
        return this.getJobStatus() == InternalJobStatus.FINISHED;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public int getIndexOfCurrentExecutionStage() {
        return this.indexToCurrentExecutionStage;
    }

    public ExecutionStage getCurrentExecutionStage() {
        try {
            return this.stages.get(this.indexToCurrentExecutionStage);
        }
        catch (ArrayIndexOutOfBoundsException e) {
            return null;
        }
    }

    public void repairStages() {
        int stageNumber;
        ExecutionGroupEdge edge;
        int i;
        ExecutionGroupVertex groupVertex;
        HashMap<ExecutionGroupVertex, Integer> stageNumbers = new HashMap<ExecutionGroupVertex, Integer>();
        ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this, true, -1);
        while (it.hasNext()) {
            groupVertex = it.next();
            int precedingNumber = 0;
            if (stageNumbers.containsKey(groupVertex)) {
                precedingNumber = (Integer)stageNumbers.get(groupVertex);
            } else {
                stageNumbers.put(groupVertex, precedingNumber);
            }
            for (i = 0; i < groupVertex.getNumberOfForwardLinks(); ++i) {
                edge = groupVertex.getForwardEdge(i);
                if (!stageNumbers.containsKey(edge.getTargetVertex())) {
                    stageNumbers.put(edge.getTargetVertex(), precedingNumber);
                    continue;
                }
                stageNumber = (Integer)stageNumbers.get(edge.getTargetVertex());
                if (stageNumber == precedingNumber) continue;
                stageNumbers.put(edge.getTargetVertex(), Math.max(precedingNumber, stageNumber));
            }
        }
        it = new ExecutionGroupVertexIterator(this, false, -1);
        while (it.hasNext()) {
            groupVertex = it.next();
            int succeedingNumber = (Integer)stageNumbers.get(groupVertex);
            for (i = 0; i < groupVertex.getNumberOfBackwardLinks(); ++i) {
                edge = groupVertex.getBackwardEdge(i);
                stageNumber = (Integer)stageNumbers.get(edge.getSourceVertex());
                if (stageNumber == succeedingNumber) continue;
                throw new IllegalStateException(edge.getSourceVertex() + " and " + edge.getTargetVertex() + " are assigned to different stages");
            }
        }
        this.stages.clear();
        for (Map.Entry entry : stageNumbers.entrySet()) {
            ExecutionGroupVertex groupVertex2 = (ExecutionGroupVertex)entry.getKey();
            int stageNumber2 = (Integer)entry.getValue();
            while (this.stages.size() <= stageNumber2) {
                this.stages.add(null);
            }
            ExecutionStage executionStage = this.stages.get(stageNumber2);
            if (executionStage == null) {
                executionStage = new ExecutionStage(this, stageNumber2);
                this.stages.set(stageNumber2, executionStage);
            }
            executionStage.addStageMember(groupVertex2);
            groupVertex2.setExecutionStage(executionStage);
        }
    }

    public void repairInstanceSharing() {
        LinkedHashSet<AllocatedResource> availableResources = new LinkedHashSet<AllocatedResource>();
        ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this, true, -1);
        while (it.hasNext()) {
            ExecutionGroupVertex groupVertex = (ExecutionGroupVertex)it.next();
            if (groupVertex.getVertexToShareInstancesWith() != null) continue;
            availableResources.clear();
            groupVertex.repairInstanceSharing(availableResources);
        }
    }

    public void repairInstanceAssignment() {
        int j;
        ChannelType channelType;
        int i;
        ExecutionGraphIterator it = new ExecutionGraphIterator(this, true);
        while (it.hasNext()) {
            ExecutionVertex sourceVertex = (ExecutionVertex)it.next();
            for (i = 0; i < sourceVertex.getNumberOfOutputGates(); ++i) {
                ExecutionGate outputGate = sourceVertex.getOutputGate(i);
                channelType = outputGate.getChannelType();
                if (channelType != ChannelType.IN_MEMORY) continue;
                int numberOfOutputChannels = outputGate.getNumberOfEdges();
                for (j = 0; j < numberOfOutputChannels; ++j) {
                    ExecutionEdge outputChannel = outputGate.getEdge(j);
                    outputChannel.getInputGate().getVertex().setAllocatedResource(sourceVertex.getAllocatedResource());
                }
            }
        }
        it = new ExecutionGraphIterator(this, false);
        while (it.hasNext()) {
            ExecutionVertex targetVertex = (ExecutionVertex)it.next();
            for (i = 0; i < targetVertex.getNumberOfInputGates(); ++i) {
                ExecutionGate inputGate = targetVertex.getInputGate(i);
                channelType = inputGate.getChannelType();
                if (channelType != ChannelType.IN_MEMORY) continue;
                int numberOfInputChannels = inputGate.getNumberOfEdges();
                for (j = 0; j < numberOfInputChannels; ++j) {
                    ExecutionEdge inputChannel = inputGate.getEdge(j);
                    inputChannel.getOutputGate().getVertex().setAllocatedResource(targetVertex.getAllocatedResource());
                }
            }
        }
    }

    public ChannelType getChannelType(ExecutionVertex sourceVertex, ExecutionVertex targetVertex) {
        ExecutionGroupVertex targetGroupVertex;
        ExecutionGroupVertex sourceGroupVertex = sourceVertex.getGroupVertex();
        List<ExecutionGroupEdge> edges = sourceGroupVertex.getForwardEdges(targetGroupVertex = targetVertex.getGroupVertex());
        if (edges.size() == 0) {
            return null;
        }
        ExecutionGroupEdge edge = edges.get(0);
        ExecutionGate outputGate = sourceVertex.getOutputGate(edge.getIndexOfOutputGate());
        for (int i = 0; i < outputGate.getNumberOfEdges(); ++i) {
            ExecutionEdge outputChannel = outputGate.getEdge(i);
            if (targetVertex != outputChannel.getInputGate().getVertex()) continue;
            return edge.getChannelType();
        }
        return null;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    private boolean jobHasFinishedStatus() {
        ExecutionGraphIterator it = new ExecutionGraphIterator(this, true);
        while (it.hasNext()) {
            if (((ExecutionVertex)it.next()).getExecutionState() == ExecutionState.FINISHED) continue;
            return false;
        }
        return true;
    }

    private boolean jobHasScheduledStatus() {
        ExecutionGraphIterator it = new ExecutionGraphIterator(this, true);
        while (it.hasNext()) {
            ExecutionState s = ((ExecutionVertex)it.next()).getExecutionState();
            if (s == ExecutionState.CREATED || s == ExecutionState.SCHEDULED || s == ExecutionState.READY) continue;
            return false;
        }
        return true;
    }

    private boolean jobHasFailedOrCanceledStatus() {
        ExecutionGraphIterator it = new ExecutionGraphIterator(this, true);
        while (it.hasNext()) {
            ExecutionState state = ((ExecutionVertex)it.next()).getExecutionState();
            if (state == ExecutionState.CANCELED || state == ExecutionState.FAILED || state == ExecutionState.FINISHED) continue;
            return false;
        }
        return true;
    }

    private static InternalJobStatus determineNewJobStatus(ExecutionGraph eg, ExecutionState latestStateChange) {
        InternalJobStatus currentJobStatus = eg.getJobStatus();
        switch (currentJobStatus) {
            case CREATED: {
                if (eg.jobHasScheduledStatus()) {
                    return InternalJobStatus.SCHEDULED;
                }
                if (latestStateChange == ExecutionState.CANCELED) {
                    if (!eg.jobHasFailedOrCanceledStatus()) break;
                    return InternalJobStatus.CANCELED;
                }
                if (latestStateChange != ExecutionState.FAILED) break;
                return InternalJobStatus.FAILING;
            }
            case SCHEDULED: {
                if (latestStateChange == ExecutionState.RUNNING) {
                    return InternalJobStatus.RUNNING;
                }
                if (latestStateChange == ExecutionState.CANCELED) {
                    if (!eg.jobHasFailedOrCanceledStatus()) break;
                    return InternalJobStatus.CANCELED;
                }
                if (latestStateChange != ExecutionState.FAILED) break;
                return InternalJobStatus.FAILING;
            }
            case RUNNING: {
                if (latestStateChange == ExecutionState.CANCELED) {
                    return InternalJobStatus.CANCELING;
                }
                if (latestStateChange == ExecutionState.FAILED) {
                    ExecutionGraphIterator it = new ExecutionGraphIterator(eg, true);
                    while (it.hasNext()) {
                        ExecutionVertex vertex = (ExecutionVertex)it.next();
                        if (vertex.getExecutionState() != ExecutionState.FAILED) continue;
                        return InternalJobStatus.FAILING;
                    }
                }
                if (!eg.jobHasFinishedStatus()) break;
                return InternalJobStatus.FINISHED;
            }
            case FAILING: {
                if (!eg.jobHasFailedOrCanceledStatus()) break;
                return InternalJobStatus.FAILED;
            }
            case FAILED: {
                LOG.error((Object)"Received update of execute state in job status FAILED");
                break;
            }
            case CANCELING: {
                if (!eg.jobHasFailedOrCanceledStatus()) break;
                return InternalJobStatus.CANCELED;
            }
            case CANCELED: {
                LOG.error((Object)("Received update of execute state in job status CANCELED: " + eg.getJobID()));
                break;
            }
            case FINISHED: {
                LOG.error((Object)("Received update of execute state in job status FINISHED: " + eg.getJobID() + " " + StringUtils.stringifyException((Throwable)new Throwable())));
            }
        }
        return currentJobStatus;
    }

    public InternalJobStatus getJobStatus() {
        return this.jobStatus.get();
    }

    @Override
    public void executionStateChanged(JobID jobID, ExecutionVertexID vertexID, ExecutionState newExecutionState, String optionalMessage) {
        ExecutionVertex vertex = this.getVertexByID(vertexID);
        if (vertex == null) {
            LOG.error((Object)("Cannot find execution vertex with the ID " + vertexID));
            return;
        }
        ExecutionState actualExecutionState = vertex.getExecutionState();
        InternalJobStatus newJobStatus = ExecutionGraph.determineNewJobStatus(this, actualExecutionState);
        if (actualExecutionState == ExecutionState.FINISHED && this.isCurrentStageCompleted()) {
            ++this.indexToCurrentExecutionStage;
            if (this.indexToCurrentExecutionStage < this.stages.size()) {
                Iterator<ExecutionStageListener> it = this.executionStageListeners.iterator();
                ExecutionStage nextExecutionStage = this.getCurrentExecutionStage();
                while (it.hasNext()) {
                    it.next().nextExecutionStageEntered(jobID, nextExecutionStage);
                }
            }
        }
        this.updateJobStatus(newJobStatus, optionalMessage);
    }

    public void updateJobStatus(InternalJobStatus newJobStatus, String optionalMessage) {
        if (this.jobStatus.getAndSet(newJobStatus) == newJobStatus) {
            return;
        }
        if (newJobStatus == InternalJobStatus.FAILING) {
            this.errorDescription = optionalMessage;
        }
        if (newJobStatus == InternalJobStatus.FAILED) {
            optionalMessage = this.errorDescription;
        }
        Iterator<JobStatusListener> it = this.jobStatusListeners.iterator();
        while (it.hasNext()) {
            it.next().jobStatusHasChanged(this, newJobStatus, optionalMessage);
        }
    }

    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        if (jobStatusListener == null) {
            throw new IllegalArgumentException("Argument jobStatusListener must not be null");
        }
        this.jobStatusListeners.addIfAbsent(jobStatusListener);
    }

    public void unregisterJobStatusListener(JobStatusListener jobStatusListener) {
        if (jobStatusListener == null) {
            throw new IllegalArgumentException("Argument jobStatusListener must not be null");
        }
        this.jobStatusListeners.remove(jobStatusListener);
    }

    public void registerExecutionStageListener(ExecutionStageListener executionStageListener) {
        if (executionStageListener == null) {
            throw new IllegalArgumentException("Argument executionStageListener must not be null");
        }
        this.executionStageListeners.addIfAbsent(executionStageListener);
    }

    public void unregisterExecutionStageListener(ExecutionStageListener executionStageListener) {
        if (executionStageListener == null) {
            throw new IllegalArgumentException("Argument executionStageListener must not be null");
        }
        this.executionStageListeners.remove(executionStageListener);
    }

    public String getJobName() {
        return this.jobName;
    }

    @Override
    public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {
    }

    @Override
    public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {
    }

    private void reconstructExecutionPipelines() {
        Iterator<ExecutionStage> it = this.stages.iterator();
        while (it.hasNext()) {
            it.next().reconstructExecutionPipelines();
        }
    }

    public Iterator<ExecutionStage> iterator() {
        return this.stages.iterator();
    }

    @Override
    public int getPriority() {
        return 1;
    }

    public void executeCommand(Runnable command) {
        this.executorService.execute(command);
    }

    private void calculateConnectionIDs() {
        HashSet<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
        ExecutionStage lastStage = this.getStage(this.getNumberOfStages() - 1);
        for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
            ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
            int currentConnectionID = 0;
            if (!groupVertex.isOutputVertex()) continue;
            currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
        }
    }

    public int getRequiredSlots() {
        int maxRequiredSlots = 0;
        for (ExecutionStage stage : this.stages) {
            int requiredSlots = stage.getRequiredSlots();
            if (requiredSlots <= maxRequiredSlots) continue;
            maxRequiredSlots = requiredSlots;
        }
        return maxRequiredSlots;
    }
}

