package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex.class */
public class ExecutionJobVertex {
    private static final Logger LOG = ExecutionGraph.LOG;
    private final Object stateMonitor;
    private final ExecutionGraph graph;
    private final AbstractJobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private final IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final InputSplitAssigner splitAssigner;
    private final int parallelism;
    private final boolean[] finishedSubtasks;
    private volatile int numSubtasksInFinalState;
    private final SlotSharingGroup slotSharingGroup;
    private final CoLocationGroup coLocationGroup;

    public ExecutionJobVertex(ExecutionGraph executionGraph, AbstractJobVertex abstractJobVertex, int i) throws JobException {
        this(executionGraph, abstractJobVertex, i, System.currentTimeMillis());
    }

    public ExecutionJobVertex(ExecutionGraph executionGraph, AbstractJobVertex abstractJobVertex, int i, long j) throws JobException {
        this.stateMonitor = new Object();
        if (executionGraph == null || abstractJobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = executionGraph;
        this.jobVertex = abstractJobVertex;
        int parallelism = abstractJobVertex.getParallelism();
        int i2 = parallelism > 0 ? parallelism : i;
        this.parallelism = i2;
        this.taskVertices = new ExecutionVertex[i2];
        this.inputs = new ArrayList(abstractJobVertex.getInputs().size());
        this.slotSharingGroup = abstractJobVertex.getSlotSharingGroup();
        this.coLocationGroup = abstractJobVertex.getCoLocationGroup();
        if (this.coLocationGroup != null && this.slotSharingGroup == null) {
            throw new JobException("Vertex uses a co-location constraint without using slot sharing");
        }
        this.producedDataSets = new IntermediateResult[abstractJobVertex.getNumberOfProducedIntermediateDataSets()];
        for (int i3 = 0; i3 < abstractJobVertex.getProducedDataSets().size(); i3++) {
            this.producedDataSets[i3] = new IntermediateResult(abstractJobVertex.getProducedDataSets().get(i3).getId(), this, i2);
        }
        for (int i4 = 0; i4 < i2; i4++) {
            this.taskVertices[i4] = new ExecutionVertex(this, i4, this.producedDataSets, j);
        }
        for (IntermediateResult intermediateResult : this.producedDataSets) {
            if (intermediateResult.getNumberOfAssignedPartitions() != this.parallelism) {
                throw new RuntimeException("The intermediate result's partitions were not correctly assiged.");
            }
        }
        try {
            InputSplitSource<?> inputSplitSource = abstractJobVertex.getInputSplitSource();
            if (inputSplitSource != null) {
                this.splitAssigner = inputSplitSource.getInputSplitAssigner(inputSplitSource.createInputSplits(i2));
            } else {
                this.splitAssigner = null;
            }
            this.finishedSubtasks = new boolean[this.parallelism];
        } catch (Throwable th) {
            throw new JobException("Creating the input splits caused an error: " + th.getMessage(), th);
        }
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public AbstractJobVertex getJobVertex() {
        return this.jobVertex;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public boolean isInFinalState() {
        return this.numSubtasksInFinalState == this.parallelism;
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> map) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), Integer.valueOf(inputs.size())));
        }
        for (int i = 0; i < inputs.size(); i++) {
            JobEdge jobEdge = inputs.get(i);
            if (LOG.isDebugEnabled()) {
                if (jobEdge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSource().getProducer().getID(), jobEdge.getSource().getProducer().getName()));
                }
            }
            IntermediateResult intermediateResult = map.get(jobEdge.getSourceId());
            if (intermediateResult == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + jobEdge.getSourceId());
            }
            this.inputs.add(intermediateResult);
            int registerConsumer = intermediateResult.registerConsumer();
            for (int i2 = 0; i2 < this.parallelism; i2++) {
                this.taskVertices[i2].connectSource(i, intermediateResult, jobEdge, registerConsumer);
            }
        }
    }

    public void scheduleAll(Scheduler scheduler, boolean z) throws NoResourceAvailableException {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.scheduleForExecution(scheduler, z);
        }
    }

    public void cancel() {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.cancel();
        }
    }

    public void fail(Throwable th) {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.fail(th);
        }
    }

    public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
        synchronized (this.stateMonitor) {
            while (this.numSubtasksInFinalState < this.parallelism) {
                this.stateMonitor.wait();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexFinished(int i) {
        subtaskInFinalState(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexCancelled(int i) {
        subtaskInFinalState(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexFailed(int i, Throwable th) {
        subtaskInFinalState(i);
    }

    private void subtaskInFinalState(int i) {
        synchronized (this.stateMonitor) {
            if (!this.finishedSubtasks[i]) {
                this.finishedSubtasks[i] = true;
                this.numSubtasksInFinalState++;
                if (this.numSubtasksInFinalState == this.parallelism) {
                    this.stateMonitor.notifyAll();
                    this.graph.jobVertexInFinalState(this);
                }
            }
        }
    }

    public void execute(Runnable runnable) {
        this.graph.execute(runnable);
    }
}
