package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task.class */
public class Task {
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final int subtaskIndex;
    private final int numberOfSubtasks;
    private final ExecutionAttemptID executionId;
    private final String taskName;
    private final ActorRef taskManager;
    private volatile RuntimeEnvironment environment;
    private volatile Throwable failureCause;
    private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList();
    private volatile ExecutionState executionState = ExecutionState.DEPLOYING;

    public Task(JobID jobID, JobVertexID jobVertexID, int i, int i2, ExecutionAttemptID executionAttemptID, String str, ActorRef actorRef) {
        this.jobId = jobID;
        this.vertexId = jobVertexID;
        this.subtaskIndex = i;
        this.numberOfSubtasks = i2;
        this.executionId = executionAttemptID;
        this.taskName = str;
        this.taskManager = actorRef;
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getVertexID() {
        return this.vertexId;
    }

    public int getSubtaskIndex() {
        return this.subtaskIndex;
    }

    public int getNumberOfSubtasks() {
        return this.numberOfSubtasks;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public void setEnvironment(RuntimeEnvironment runtimeEnvironment) {
        this.environment = runtimeEnvironment;
    }

    public RuntimeEnvironment getEnvironment() {
        return this.environment;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public String getTaskName() {
        return LOG.isDebugEnabled() ? this.taskName + " (" + this.executionId + ")" : this.taskName;
    }

    public String getTaskNameWithSubtasks() {
        return LOG.isDebugEnabled() ? this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ") (" + this.executionId + ")" : this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ")";
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public boolean markAsFinished() {
        if (!STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
            return false;
        }
        notifyObservers(ExecutionState.FINISHED, null);
        unregisterTask();
        return true;
    }

    public void markFailed(Throwable th) {
        ExecutionState executionState;
        do {
            executionState = this.executionState;
            if (executionState == ExecutionState.CANCELED) {
                return;
            }
        } while (!STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.FAILED));
        this.failureCause = th;
        notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(th));
        unregisterTask();
    }

    public void cancelExecution() {
        while (true) {
            ExecutionState executionState = this.executionState;
            if (executionState == ExecutionState.FINISHED || executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING || executionState == ExecutionState.FAILED) {
                return;
            }
            if (executionState == ExecutionState.DEPLOYING) {
                if (STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.CANCELED)) {
                    notifyObservers(ExecutionState.CANCELED, null);
                    unregisterTask();
                    return;
                }
            } else {
                if (executionState != ExecutionState.RUNNING) {
                    throw new RuntimeException("unexpected state for cancelling: " + executionState);
                }
                if (STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.CANCELING)) {
                    notifyObservers(ExecutionState.CANCELING, null);
                    try {
                        this.environment.cancelExecution();
                        return;
                    } catch (Throwable th) {
                        LOG.error("Error while cancelling the task.", th);
                        return;
                    }
                }
            }
        }
    }

    public void failExternally(Throwable th) {
        while (true) {
            ExecutionState executionState = this.executionState;
            if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING || executionState == ExecutionState.FAILED) {
                return;
            }
            if (executionState == ExecutionState.FINISHED && STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.FAILED)) {
                notifyObservers(ExecutionState.FAILED, null);
                return;
            }
            if (executionState == ExecutionState.DEPLOYING) {
                if (STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.FAILED)) {
                    this.failureCause = th;
                    notifyObservers(ExecutionState.FAILED, null);
                    unregisterTask();
                    return;
                }
            } else {
                if (executionState != ExecutionState.RUNNING) {
                    throw new RuntimeException("unexpected state for failing the task: " + executionState);
                }
                if (STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.FAILED)) {
                    try {
                        this.environment.cancelExecution();
                    } catch (Throwable th2) {
                        LOG.error("Error while cancelling the task.", th2);
                    }
                    this.failureCause = th;
                    notifyObservers(ExecutionState.FAILED, null);
                    unregisterTask();
                    return;
                }
            }
        }
    }

    public void cancelingDone() {
        ExecutionState executionState;
        do {
            executionState = this.executionState;
            if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
                return;
            }
            if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.CANCELING) {
                LOG.error(String.format("Unexpected state transition in Task: %s -> %s", executionState, ExecutionState.CANCELED));
            }
        } while (!STATE_UPDATER.compareAndSet(this, executionState, ExecutionState.CANCELED));
        notifyObservers(ExecutionState.CANCELED, null);
        unregisterTask();
    }

    public boolean startExecution() {
        LOG.info("Starting execution of task {}", getTaskName());
        if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
            return false;
        }
        this.environment.getExecutingThread().start();
        return true;
    }

    public void unregisterMemoryManager(MemoryManager memoryManager) {
        RuntimeEnvironment runtimeEnvironment = this.environment;
        if (memoryManager == null || runtimeEnvironment == null) {
            return;
        }
        memoryManager.releaseAll(runtimeEnvironment.getInvokable());
    }

    protected void unregisterTask() {
        this.taskManager.tell(new TaskMessages.UnregisterTask(this.executionId), ActorRef.noSender());
    }

    protected void notifyExecutionStateChange(ExecutionState executionState, Throwable th) {
        LOG.info("Update execution state of {} ({}) to {}.", new Object[]{getTaskName(), getExecutionId(), executionState});
        this.taskManager.tell(new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, executionState, th)), ActorRef.noSender());
    }

    public void registerProfiler(org.apache.flink.runtime.profiling.TaskManagerProfiler taskManagerProfiler, Configuration configuration) {
        taskManagerProfiler.registerTask(this, configuration);
    }

    public void unregisterProfiler(org.apache.flink.runtime.profiling.TaskManagerProfiler taskManagerProfiler) {
        if (taskManagerProfiler != null) {
            taskManagerProfiler.unregisterTask(this.executionId);
        }
    }

    public SingleInputGate[] getInputGates() {
        if (this.environment != null) {
            return this.environment.getAllInputGates();
        }
        return null;
    }

    public ResultPartitionWriter[] getWriters() {
        if (this.environment != null) {
            return this.environment.getAllWriters();
        }
        return null;
    }

    public ResultPartition[] getProducedPartitions() {
        if (this.environment != null) {
            return this.environment.getProducedPartitions();
        }
        return null;
    }

    public void registerExecutionListener(ActorRef actorRef) {
        this.executionListenerActors.add(actorRef);
    }

    public void unregisterExecutionListener(ActorRef actorRef) {
        this.executionListenerActors.remove(actorRef);
    }

    private void notifyObservers(ExecutionState executionState, String str) {
        if (LOG.isInfoEnabled()) {
            LOG.info(getTaskNameWithSubtasks() + " switched to " + executionState + (str == null ? "" : " : " + str));
        }
        Iterator<ActorRef> it = this.executionListenerActors.iterator();
        while (it.hasNext()) {
            it.next().tell(new ExecutionGraphMessages.ExecutionStateChanged(this.jobId, this.vertexId, this.taskName, this.numberOfSubtasks, this.subtaskIndex, this.executionId, executionState, System.currentTimeMillis(), str), ActorRef.noSender());
        }
    }

    public String toString() {
        return getTaskNameWithSubtasks() + " [" + this.executionState + ']';
    }
}
