package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph.class */
public class ExecutionGraph implements Serializable {
    private static final long serialVersionUID = 42;
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
    private final SerializableObject progressLock;
    private final JobID jobID;
    private final String jobName;
    private final Configuration jobConfiguration;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<BlobKey> requiredJarFiles;
    private final List<ActorRef> jobStatusListenerActors;
    private final List<ActorRef> executionListenerActors;
    private final long[] stateTimestamps;
    private final FiniteDuration timeout;
    private int numberOfRetriesLeft;
    private long delayBeforeRetrying;
    private boolean allowQueuedScheduling;
    private ScheduleMode scheduleMode;
    private boolean snapshotCheckpointsEnabled;
    private volatile JobStatus state;
    private volatile Throwable failureCause;
    private volatile int numFinishedJobVertices;
    private Scheduler scheduler;
    private ClassLoader userClassLoader;
    private CheckpointCoordinator checkpointCoordinator;
    private ExecutionConfig executionConfig;

    ExecutionGraph(JobID jobID, String str, Configuration configuration, FiniteDuration finiteDuration) {
        this(jobID, str, configuration, finiteDuration, new ArrayList(), ExecutionGraph.class.getClassLoader());
    }

    public ExecutionGraph(JobID jobID, String str, Configuration configuration, FiniteDuration finiteDuration, List<BlobKey> list, ClassLoader classLoader) {
        this.progressLock = new SerializableObject();
        this.allowQueuedScheduling = false;
        this.scheduleMode = ScheduleMode.FROM_SOURCES;
        this.state = JobStatus.CREATED;
        if (jobID == null || str == null || configuration == null || classLoader == null) {
            throw new NullPointerException();
        }
        this.jobID = jobID;
        this.jobName = str;
        this.jobConfiguration = configuration;
        this.userClassLoader = classLoader;
        this.tasks = new ConcurrentHashMap<>();
        this.intermediateResults = new ConcurrentHashMap<>();
        this.verticesInCreationOrder = new ArrayList();
        this.currentExecutions = new ConcurrentHashMap<>();
        this.jobStatusListenerActors = new CopyOnWriteArrayList();
        this.executionListenerActors = new CopyOnWriteArrayList();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.requiredJarFiles = list;
        this.timeout = finiteDuration;
    }

    public void setNumberOfRetriesLeft(int i) {
        if (i < -1) {
            throw new IllegalArgumentException();
        }
        this.numberOfRetriesLeft = i;
    }

    public int getNumberOfRetriesLeft() {
        return this.numberOfRetriesLeft;
    }

    public void setDelayBeforeRetrying(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("Delay before retry must be non-negative.");
        }
        this.delayBeforeRetrying = j;
    }

    public long getDelayBeforeRetrying() {
        return this.delayBeforeRetrying;
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean z) {
        this.allowQueuedScheduling = z;
    }

    public void setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
    }

    public ScheduleMode getScheduleMode() {
        return this.scheduleMode;
    }

    public void enableSnaphotCheckpointing(long j, long j2, List<ExecutionJobVertex> list, List<ExecutionJobVertex> list2, List<ExecutionJobVertex> list3, ActorSystem actorSystem) {
        if (j < 10 || j2 < 10) {
            throw new IllegalArgumentException();
        }
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        ExecutionVertex[] collectExecutionVertices = collectExecutionVertices(list);
        ExecutionVertex[] collectExecutionVertices2 = collectExecutionVertices(list2);
        ExecutionVertex[] collectExecutionVertices3 = collectExecutionVertices(list3);
        disableSnaphotCheckpointing();
        this.snapshotCheckpointsEnabled = true;
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobID, 1, j2, collectExecutionVertices, collectExecutionVertices2, collectExecutionVertices3, this.userClassLoader);
        registerJobStatusListener(this.checkpointCoordinator.createJobStatusListener(actorSystem, j));
    }

    public void disableSnaphotCheckpointing() {
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        this.snapshotCheckpointsEnabled = false;
        if (this.checkpointCoordinator != null) {
            this.checkpointCoordinator.shutdown();
            this.checkpointCoordinator = null;
        }
    }

    public boolean isSnapshotCheckpointsEnabled() {
        return this.snapshotCheckpointsEnabled;
    }

    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> list) {
        if (list.size() == 1) {
            ExecutionJobVertex executionJobVertex = list.get(0);
            if (executionJobVertex.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return executionJobVertex.getTaskVertices();
        }
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertex executionJobVertex2 : list) {
            if (executionJobVertex2.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            arrayList.addAll(Arrays.asList(executionJobVertex2.getTaskVertices()));
        }
        return (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
    }

    public List<BlobKey> getRequiredJarFiles() {
        return this.requiredJarFiles;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public String getJobName() {
        return this.jobName;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
        return this.tasks.get(jobVertexID);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int size = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1
            @Override // java.lang.Iterable
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1.1
                    private int pos = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.pos < size;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ExecutionJobVertex next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        List list = ExecutionGraph.this.verticesInCreationOrder;
                        int i = this.pos;
                        this.pos = i + 1;
                        return (ExecutionJobVertex) list.get(i);
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.2
            @Override // java.lang.Iterable
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    public long getStatusTimestamp(JobStatus jobStatus) {
        return this.stateTimestamps[jobStatus.ordinal()];
    }

    public void attachJobGraph(List<JobVertex> list) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", Integer.valueOf(list.size()), Integer.valueOf(this.tasks.size()), Integer.valueOf(this.intermediateResults.size())));
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (JobVertex jobVertex : list) {
            ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(this, jobVertex, 1, this.timeout, currentTimeMillis);
            executionJobVertex.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex putIfAbsent = this.tasks.putIfAbsent(jobVertex.getID(), executionJobVertex);
            if (putIfAbsent != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), executionJobVertex, putIfAbsent));
            }
            for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                IntermediateResult putIfAbsent2 = this.intermediateResults.putIfAbsent(intermediateResult.getId(), intermediateResult);
                if (putIfAbsent2 != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", intermediateResult.getId(), intermediateResult, putIfAbsent2));
                }
            }
            this.verticesInCreationOrder.add(executionJobVertex);
        }
    }

    public void scheduleForExecution(Scheduler scheduler) throws JobException {
        if (scheduler == null) {
            throw new IllegalArgumentException("Scheduler must not be null.");
        }
        if (this.scheduler != null && this.scheduler != scheduler) {
            throw new IllegalArgumentException("Cannot use different schedulers for the same job");
        }
        if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
        this.scheduler = scheduler;
        switch (this.scheduleMode) {
            case FROM_SOURCES:
                for (ExecutionJobVertex executionJobVertex : this.tasks.values()) {
                    if (executionJobVertex.getJobVertex().isInputVertex()) {
                        executionJobVertex.scheduleAll(scheduler, this.allowQueuedScheduling);
                    }
                }
                return;
            case ALL:
                Iterator<ExecutionJobVertex> it = getVerticesTopologically().iterator();
                while (it.hasNext()) {
                    it.next().scheduleAll(scheduler, this.allowQueuedScheduling);
                }
                return;
            case BACKTRACKING:
                throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
            default:
                return;
        }
    }

    public void cancel() {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus != JobStatus.RUNNING && jobStatus != JobStatus.CREATED) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELLING));
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void fail(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus == JobStatus.FAILED || jobStatus == JobStatus.FAILING) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.FAILING, th));
        this.failureCause = th;
        if (this.verticesInCreationOrder.isEmpty()) {
            transitionState(JobStatus.FAILING, JobStatus.FAILED, th);
            return;
        }
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void restart() {
        try {
            if (this.state == JobStatus.FAILED && !transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
                throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
            }
            synchronized (this.progressLock) {
                if (this.state != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                if (this.scheduler == null) {
                    throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
                }
                this.currentExecutions.clear();
                Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
                while (it.hasNext()) {
                    it.next().resetForNewExecution();
                }
                for (int i = 0; i < this.stateTimestamps.length; i++) {
                    this.stateTimestamps[i] = 0;
                }
                this.numFinishedJobVertices = 0;
                transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null) {
                    this.checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
                }
            }
            scheduleForExecution(this.scheduler);
        } catch (Throwable th) {
            fail(th);
        }
    }

    public void prepareForArchiving() {
        if (!this.state.isTerminalState()) {
            throw new IllegalStateException("Can only archive the job from a terminal state");
        }
        try {
            this.executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(this.jobConfiguration, "runtime.config", this.userClassLoader);
        } catch (Exception e) {
            LOG.warn("Error deserializing the execution config while archiving the execution graph", e);
        }
        this.userClassLoader = null;
        this.scheduler = null;
        this.checkpointCoordinator = null;
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().prepareForArchiving();
        }
        this.intermediateResults.clear();
        this.currentExecutions.clear();
        this.requiredJarFiles.clear();
        this.jobStatusListenerActors.clear();
        this.executionListenerActors.clear();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public void waitUntilFinished() throws InterruptedException {
        synchronized (this.progressLock) {
            while (!this.state.isTerminalState()) {
                this.progressLock.wait();
            }
        }
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        return transitionState(jobStatus, jobStatus2, null);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} switched from {} to {}.", new Object[]{getJobName(), jobStatus, jobStatus2});
        }
        this.stateTimestamps[jobStatus2.ordinal()] = System.currentTimeMillis();
        notifyJobStatusChange(jobStatus2, th);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobVertexInFinalState(ExecutionJobVertex executionJobVertex) {
        synchronized (this.progressLock) {
            if (this.numFinishedJobVertices >= this.verticesInCreationOrder.size()) {
                throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
            }
            this.numFinishedJobVertices++;
            if (this.numFinishedJobVertices == this.verticesInCreationOrder.size()) {
                while (true) {
                    JobStatus jobStatus = this.state;
                    if (jobStatus == JobStatus.RUNNING) {
                        if (transitionState(jobStatus, JobStatus.FINISHED)) {
                            postRunCleanup();
                            break;
                        }
                    } else if (jobStatus == JobStatus.CANCELLING) {
                        if (transitionState(jobStatus, JobStatus.CANCELED)) {
                            postRunCleanup();
                            break;
                        }
                    } else if (jobStatus != JobStatus.FAILING) {
                        fail(new Exception("ExecutionGraph went into final state from state " + jobStatus));
                    } else if (this.numberOfRetriesLeft > 0 && transitionState(jobStatus, JobStatus.RESTARTING)) {
                        this.numberOfRetriesLeft--;
                        Futures.future(new Callable<Object>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.3
                            @Override // java.util.concurrent.Callable
                            public Object call() throws Exception {
                                try {
                                    ExecutionGraph.LOG.info("Delaying retry of job execution for {} ms ...", Long.valueOf(ExecutionGraph.this.delayBeforeRetrying));
                                    Thread.sleep(ExecutionGraph.this.delayBeforeRetrying);
                                } catch (InterruptedException e) {
                                }
                                ExecutionGraph.this.restart();
                                return null;
                            }
                        }, AkkaUtils.globalExecutionContext());
                        break;
                    } else if (this.numberOfRetriesLeft <= 0 && transitionState(jobStatus, JobStatus.FAILED, this.failureCause)) {
                        postRunCleanup();
                        break;
                    }
                }
                this.progressLock.notifyAll();
            }
        }
    }

    private void postRunCleanup() {
        try {
            CheckpointCoordinator checkpointCoordinator = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (checkpointCoordinator != null) {
                checkpointCoordinator.shutdown();
            }
        } catch (Exception e) {
            LOG.error("Error while cleaning up after execution", e);
        }
    }

    public boolean updateState(TaskExecutionState taskExecutionState) {
        Execution execution = this.currentExecutions.get(taskExecutionState.getID());
        if (execution == null) {
            return false;
        }
        switch (taskExecutionState.getExecutionState()) {
            case RUNNING:
                return execution.switchToRunning();
            case FINISHED:
                execution.markFinished();
                return true;
            case CANCELED:
                execution.cancelingComplete();
                return true;
            case FAILED:
                execution.markFailed(taskExecutionState.getError(this.userClassLoader));
                return true;
            default:
                execution.fail(new Exception("TaskManager sent illegal state update: " + taskExecutionState.getExecutionState()));
                return false;
        }
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID) {
        Execution execution = this.currentExecutions.get(resultPartitionID.getProducerId());
        if (execution == null) {
            fail(new IllegalStateException("Cannot find execution for execution ID " + resultPartitionID.getPartitionId()));
        } else if (execution.getVertex() == null) {
            fail(new IllegalStateException("Execution with execution ID " + resultPartitionID.getPartitionId() + " has no vertex assigned."));
        } else {
            execution.getVertex().scheduleOrUpdateConsumers(resultPartitionID);
        }
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerExecution(Execution execution) {
        if (this.currentExecutions.putIfAbsent(execution.getAttemptId(), execution) != null) {
            fail(new Exception("Trying to register execution " + execution + " for already used ID " + execution.getAttemptId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deregisterExecution(Execution execution) {
        Execution remove = this.currentExecutions.remove(execution.getAttemptId());
        if (remove == null || remove == execution) {
            return;
        }
        fail(new Exception("De-registering execution " + execution + " failed. Found for same ID execution " + remove));
    }

    public void registerJobStatusListener(ActorRef actorRef) {
        if (actorRef != null) {
            this.jobStatusListenerActors.add(actorRef);
        }
    }

    public void registerExecutionListener(ActorRef actorRef) {
        if (actorRef != null) {
            this.executionListenerActors.add(actorRef);
        }
    }

    private void notifyJobStatusChange(JobStatus jobStatus, Throwable th) {
        if (this.jobStatusListenerActors.size() > 0) {
            ExecutionGraphMessages.JobStatusChanged jobStatusChanged = new ExecutionGraphMessages.JobStatusChanged(this.jobID, jobStatus, System.currentTimeMillis(), th);
            Iterator<ActorRef> it = this.jobStatusListenerActors.iterator();
            while (it.hasNext()) {
                it.next().tell(jobStatusChanged, ActorRef.noSender());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyExecutionChange(JobVertexID jobVertexID, int i, ExecutionAttemptID executionAttemptID, ExecutionState executionState, Throwable th) {
        ExecutionJobVertex jobVertex = getJobVertex(jobVertexID);
        if (this.executionListenerActors.size() > 0) {
            ExecutionGraphMessages.ExecutionStateChanged executionStateChanged = new ExecutionGraphMessages.ExecutionStateChanged(this.jobID, jobVertexID, jobVertex.getJobVertex().getName(), jobVertex.getParallelism(), i, executionAttemptID, executionState, System.currentTimeMillis(), th == null ? null : ExceptionUtils.stringifyException(th));
            Iterator<ActorRef> it = this.executionListenerActors.iterator();
            while (it.hasNext()) {
                it.next().tell(executionStateChanged, ActorRef.noSender());
            }
        }
        if (executionState == ExecutionState.FAILED) {
            fail(th);
        }
    }
}
