package org.apache.flink.runtime.executiongraph;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph.class */
public class ExecutionGraph {
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final JobID jobID;
    private final String jobName;
    private final Configuration jobConfiguration;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final Map<ChannelID, ExecutionEdge> edges;
    private final ExecutorService executor;
    private final List<BlobKey> requiredJarFiles;
    private final List<JobStatusListener> jobStatusListeners;
    private final List<ExecutionListener> executionListeners;
    private final long[] stateTimestamps;
    private final Object progressLock;
    private int nextVertexToFinish;
    private volatile JobStatus state;
    private volatile Throwable failureCause;
    private Scheduler scheduler;
    private boolean allowQueuedScheduling;

    /* renamed from: org.apache.flink.runtime.executiongraph.ExecutionGraph$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$execution$ExecutionState = new int[ExecutionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$execution$ExecutionState[ExecutionState.FINISHED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$execution$ExecutionState[ExecutionState.CANCELED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$execution$ExecutionState[ExecutionState.FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ExecutionGraph(JobID jobID, String str, Configuration configuration) {
        this(jobID, str, configuration, new ArrayList(), null);
    }

    public ExecutionGraph(JobID jobID, String str, Configuration configuration, List<BlobKey> list, ExecutorService executorService) {
        this.edges = new HashMap();
        this.progressLock = new Object();
        this.state = JobStatus.CREATED;
        this.allowQueuedScheduling = true;
        if (jobID == null || str == null || configuration == null) {
            throw new NullPointerException();
        }
        this.jobID = jobID;
        this.jobName = str;
        this.jobConfiguration = configuration;
        this.executor = executorService;
        this.tasks = new ConcurrentHashMap<>();
        this.intermediateResults = new ConcurrentHashMap<>();
        this.verticesInCreationOrder = new ArrayList();
        this.currentExecutions = new ConcurrentHashMap<>();
        this.jobStatusListeners = new CopyOnWriteArrayList();
        this.executionListeners = new CopyOnWriteArrayList();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.requiredJarFiles = list;
    }

    public void attachJobGraph(List<AbstractJobVertex> list) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", Integer.valueOf(list.size()), Integer.valueOf(this.tasks.size()), Integer.valueOf(this.intermediateResults.size())));
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (AbstractJobVertex abstractJobVertex : list) {
            ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(this, abstractJobVertex, 1, currentTimeMillis);
            executionJobVertex.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex putIfAbsent = this.tasks.putIfAbsent(abstractJobVertex.getID(), executionJobVertex);
            if (putIfAbsent != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", abstractJobVertex.getID(), executionJobVertex, putIfAbsent));
            }
            for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                IntermediateResult putIfAbsent2 = this.intermediateResults.putIfAbsent(intermediateResult.getId(), intermediateResult);
                if (putIfAbsent2 != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", intermediateResult.getId(), intermediateResult, putIfAbsent2));
                }
            }
            this.verticesInCreationOrder.add(executionJobVertex);
        }
    }

    public List<BlobKey> getRequiredJarFiles() {
        return this.requiredJarFiles;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public String getJobName() {
        return this.jobName;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public JobStatus getState() {
        return this.state;
    }

    public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
        return this.tasks.get(jobVertexID);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int size = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1
            @Override // java.lang.Iterable
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1.1
                    private int pos = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.pos < size;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ExecutionJobVertex next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        List list = ExecutionGraph.this.verticesInCreationOrder;
                        int i = this.pos;
                        this.pos = i + 1;
                        return (ExecutionJobVertex) list.get(i);
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.2
            @Override // java.lang.Iterable
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    public long getStatusTimestamp(JobStatus jobStatus) {
        return this.stateTimestamps[jobStatus.ordinal()];
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean z) {
        this.allowQueuedScheduling = z;
    }

    public void scheduleForExecution(Scheduler scheduler) throws JobException {
        if (scheduler == null) {
            throw new IllegalArgumentException("Scheduler must not be null.");
        }
        if (this.scheduler != null && this.scheduler != scheduler) {
            throw new IllegalArgumentException("Cann not use different schedulers for the same job");
        }
        if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
        this.scheduler = scheduler;
        for (ExecutionJobVertex executionJobVertex : this.tasks.values()) {
            if (executionJobVertex.getJobVertex().isInputVertex()) {
                executionJobVertex.scheduleAll(scheduler, this.allowQueuedScheduling);
            }
        }
    }

    public void cancel() {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus != JobStatus.RUNNING && jobStatus != JobStatus.CREATED) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELLING));
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void fail(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus == JobStatus.FAILED || jobStatus == JobStatus.FAILING) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.FAILING, th));
        this.failureCause = th;
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void waitForJobEnd(long j) throws InterruptedException {
        synchronized (this.progressLock) {
            while (this.nextVertexToFinish < this.verticesInCreationOrder.size()) {
                this.progressLock.wait(j);
            }
        }
    }

    public void waitForJobEnd() throws InterruptedException {
        waitForJobEnd(0L);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        return transitionState(jobStatus, jobStatus2, null);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        this.stateTimestamps[jobStatus2.ordinal()] = System.currentTimeMillis();
        notifyJobStatusChange(jobStatus2, th);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Code restructure failed: missing block: B:11:0x0027, code lost:
    
        if (r6.verticesInCreationOrder.get(r9) == r7) goto L11;
     */
    /* JADX WARN: Code restructure failed: missing block: B:12:0x002a, code lost:
    
        r9 = r9 + 1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:13:0x0037, code lost:
    
        if (r9 >= r6.verticesInCreationOrder.size()) goto L51;
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x004a, code lost:
    
        if (r6.verticesInCreationOrder.get(r9).isInFinalState() != false) goto L53;
     */
    /* JADX WARN: Code restructure failed: missing block: B:17:0x004d, code lost:
    
        r6.nextVertexToFinish = r9;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x005c, code lost:
    
        if (r9 != r6.verticesInCreationOrder.size()) goto L41;
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x005f, code lost:
    
        r0 = r6.state;
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x006a, code lost:
    
        if (r0 != org.apache.flink.runtime.jobgraph.JobStatus.RUNNING) goto L22;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x0076, code lost:
    
        if (transitionState(r0, org.apache.flink.runtime.jobgraph.JobStatus.FINISHED) == false) goto L22;
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x00e9, code lost:
    
        r6.progressLock.notifyAll();
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x0081, code lost:
    
        if (r0 != org.apache.flink.runtime.jobgraph.JobStatus.CANCELLING) goto L27;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x008d, code lost:
    
        if (transitionState(r0, org.apache.flink.runtime.jobgraph.JobStatus.CANCELED) == false) goto L27;
     */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x0098, code lost:
    
        if (r0 != org.apache.flink.runtime.jobgraph.JobStatus.FAILING) goto L32;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x00a8, code lost:
    
        if (transitionState(r0, org.apache.flink.runtime.jobgraph.JobStatus.FAILED, r6.failureCause) == false) goto L32;
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x00b3, code lost:
    
        if (r0 == org.apache.flink.runtime.jobgraph.JobStatus.CANCELED) goto L38;
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00bb, code lost:
    
        if (r0 == org.apache.flink.runtime.jobgraph.JobStatus.CREATED) goto L38;
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x00c3, code lost:
    
        if (r0 != org.apache.flink.runtime.jobgraph.JobStatus.FINISHED) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x00c6, code lost:
    
        fail(new java.lang.Exception("ExecutionGraph went into final state from state " + r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x00fc, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void jobVertexInFinalState(org.apache.flink.runtime.executiongraph.ExecutionJobVertex r7) {
        /*
            Method dump skipped, instructions count: 253
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.executiongraph.ExecutionGraph.jobVertexInFinalState(org.apache.flink.runtime.executiongraph.ExecutionJobVertex):void");
    }

    public boolean updateState(TaskExecutionState taskExecutionState) {
        Execution execution = this.currentExecutions.get(taskExecutionState.getID());
        if (execution == null) {
            return false;
        }
        switch (AnonymousClass3.$SwitchMap$org$apache$flink$runtime$execution$ExecutionState[taskExecutionState.getExecutionState().ordinal()]) {
            case 1:
                execution.markFinished();
                return true;
            case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                execution.cancelingComplete();
                return true;
            case 3:
                execution.markFailed(taskExecutionState.getError());
                return true;
            default:
                execution.fail(new Exception("TaskManager sent illegal state update: " + taskExecutionState.getExecutionState()));
                return false;
        }
    }

    public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo instanceConnectionInfo, ChannelID channelID) {
        ExecutionEdge executionEdge = this.edges.get(channelID);
        if (executionEdge == null) {
            LOG.error("Cannot find execution edge associated with ID " + channelID);
            fail(new Exception("Channels are not correctly registered"));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        if (channelID.equals(executionEdge.getInputChannelId())) {
            ExecutionVertex producer = executionEdge.getSource().getProducer();
            ExecutionState executionState = producer.getExecutionState();
            if (executionState == ExecutionState.RUNNING) {
                Instance allocatedSlot = producer.getCurrentAssignedResource().getInstance();
                if (allocatedSlot.getInstanceConnectionInfo().equals(instanceConnectionInfo)) {
                    return ConnectionInfoLookupResponse.createReceiverFoundAndReady(executionEdge.getOutputChannelId());
                }
                InstanceConnectionInfo instanceConnectionInfo2 = allocatedSlot.getInstanceConnectionInfo();
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(new InetSocketAddress(instanceConnectionInfo2.address(), instanceConnectionInfo2.dataPort()), executionEdge.getSource().getIntermediateResult().getConnectionIndex()));
            }
            if (executionState == ExecutionState.FINISHED) {
                LOG.error("Receiver " + producer + " set to FINISHED even though data is pending");
                fail(new Exception("Channels are not correctly registered"));
                return ConnectionInfoLookupResponse.createReceiverNotFound();
            }
            if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING) {
                return ConnectionInfoLookupResponse.createJobIsAborting();
            }
            LOG.error("Channel lookup (backwards) - sender " + producer + " found in inconsistent state " + executionState);
            fail(new Exception("Channels are not correctly registered"));
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        ExecutionVertex target = executionEdge.getTarget();
        ExecutionState executionState2 = target.getExecutionState();
        if (executionState2 == ExecutionState.RUNNING) {
            Instance allocatedSlot2 = target.getCurrentAssignedResource().getInstance();
            if (allocatedSlot2.getInstanceConnectionInfo().equals(instanceConnectionInfo)) {
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(executionEdge.getInputChannelId());
            }
            InstanceConnectionInfo instanceConnectionInfo3 = allocatedSlot2.getInstanceConnectionInfo();
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(new InetSocketAddress(instanceConnectionInfo3.address(), instanceConnectionInfo3.dataPort()), executionEdge.getSource().getIntermediateResult().getConnectionIndex()));
        }
        if (executionState2 == ExecutionState.DEPLOYING || executionState2 == ExecutionState.SCHEDULED) {
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        if (executionState2 == ExecutionState.CREATED) {
            try {
                executionEdge.getTarget().scheduleForExecution(this.scheduler, false);
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            } catch (JobException e) {
                fail(new Exception("Cannot schedule the receivers, not enough resources", e));
                return ConnectionInfoLookupResponse.createJobIsAborting();
            }
        }
        if (executionState2 == ExecutionState.CANCELED || executionState2 == ExecutionState.CANCELING || executionState2 == ExecutionState.FAILED) {
            return ConnectionInfoLookupResponse.createJobIsAborting();
        }
        String str = "Channel lookup (forward) - receiver " + target + " found in inconsistent state " + executionState2;
        LOG.error(str);
        fail(new Exception(str));
        return ConnectionInfoLookupResponse.createReceiverNotFound();
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerExecution(Execution execution) {
        if (this.currentExecutions.putIfAbsent(execution.getAttemptId(), execution) != null) {
            fail(new Exception("Trying to register execution " + execution + " for already used ID " + execution.getAttemptId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deregisterExecution(Execution execution) {
        Execution remove = this.currentExecutions.remove(execution.getAttemptId());
        if (remove == null || remove == execution) {
            return;
        }
        fail(new Exception("De-registering execution " + execution + " failed. Found for same ID execution " + remove));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerExecutionEdge(ExecutionEdge executionEdge) {
        ChannelID inputChannelId = executionEdge.getInputChannelId();
        this.edges.put(executionEdge.getOutputChannelId(), executionEdge);
        this.edges.put(inputChannelId, executionEdge);
    }

    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        this.jobStatusListeners.add(jobStatusListener);
    }

    public void registerExecutionListener(ExecutionListener executionListener) {
        this.executionListeners.add(executionListener);
    }

    private void notifyJobStatusChange(JobStatus jobStatus, Throwable th) {
        if (this.jobStatusListeners.size() > 0) {
            String stringifyException = th == null ? null : ExceptionUtils.stringifyException(th);
            Iterator<JobStatusListener> it = this.jobStatusListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().jobStatusHasChanged(this, jobStatus, stringifyException);
                } catch (Throwable th2) {
                    LOG.error("Notification of job status change caused an error.", th2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyExecutionChange(JobVertexID jobVertexID, int i, ExecutionAttemptID executionAttemptID, ExecutionState executionState, Throwable th) {
        if (this.executionListeners.size() > 0) {
            String stringifyException = th == null ? null : ExceptionUtils.stringifyException(th);
            Iterator<ExecutionListener> it = this.executionListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().executionStateChanged(this.jobID, jobVertexID, i, executionAttemptID, executionState, stringifyException);
                } catch (Throwable th2) {
                    LOG.error("Notification of execution state change caused an error.", th2);
                }
            }
        }
        if (executionState == ExecutionState.FAILED) {
            fail(th);
        }
    }

    public void execute(Runnable runnable) {
        if (this.executor != null) {
            this.executor.submit(runnable);
        } else {
            runnable.run();
        }
    }
}
