/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.util.Timeout;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskInputSplitProvider;
import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public class Task
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final int subtaskIndex;
    private final int parallelism;
    private final String taskName;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final List<BlobKey> requiredJarFiles;
    private final String nameOfInvokableClass;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final ResultPartition[] producedPartitions;
    private final ResultPartitionWriter[] writers;
    private final SingleInputGate[] inputGates;
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
    private final ActorRef taskManager;
    private final ActorRef jobManager;
    private final List<ActorRef> executionListenerActors;
    private final Timeout actorAskTimeout;
    private final LibraryCacheManager libraryCache;
    private final FileCache fileCache;
    private final NetworkEnvironment network;
    private final Thread executingThread;
    private final AtomicBoolean invokableHasBeenCanceled;
    private volatile AbstractInvokable invokable;
    private volatile ExecutionState executionState = ExecutionState.CREATED;
    private volatile Throwable failureCause;
    private volatile SerializedValue<StateHandle<?>> operatorState;

    public Task(TaskDeploymentDescriptor tdd, MemoryManager memManager, IOManager ioManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager bcVarManager, ActorRef taskManagerActor, ActorRef jobManagerActor, FiniteDuration actorAskTimeout, LibraryCacheManager libraryCache, FileCache fileCache) {
        int i;
        Preconditions.checkArgument(tdd.getNumberOfSubtasks() > 0);
        Preconditions.checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
        Preconditions.checkArgument(tdd.getIndexInSubtaskGroup() < tdd.getNumberOfSubtasks());
        this.jobId = Preconditions.checkNotNull(tdd.getJobID());
        this.vertexId = Preconditions.checkNotNull(tdd.getVertexID());
        this.executionId = Preconditions.checkNotNull(tdd.getExecutionId());
        this.subtaskIndex = tdd.getIndexInSubtaskGroup();
        this.parallelism = tdd.getNumberOfSubtasks();
        this.taskName = Preconditions.checkNotNull(tdd.getTaskName());
        this.taskNameWithSubtask = Task.getTaskNameWithSubtask(this.taskName, this.subtaskIndex, this.parallelism);
        this.jobConfiguration = Preconditions.checkNotNull(tdd.getJobConfiguration());
        this.taskConfiguration = Preconditions.checkNotNull(tdd.getTaskConfiguration());
        this.requiredJarFiles = Preconditions.checkNotNull(tdd.getRequiredJarFiles());
        this.nameOfInvokableClass = Preconditions.checkNotNull(tdd.getInvokableClassName());
        this.operatorState = tdd.getOperatorState();
        this.memoryManager = Preconditions.checkNotNull(memManager);
        this.ioManager = Preconditions.checkNotNull(ioManager);
        this.broadcastVariableManager = Preconditions.checkNotNull(bcVarManager);
        this.jobManager = Preconditions.checkNotNull(jobManagerActor);
        this.taskManager = Preconditions.checkNotNull(taskManagerActor);
        this.actorAskTimeout = new Timeout(Preconditions.checkNotNull(actorAskTimeout));
        this.libraryCache = Preconditions.checkNotNull(libraryCache);
        this.fileCache = Preconditions.checkNotNull(fileCache);
        this.network = Preconditions.checkNotNull(networkEnvironment);
        this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
        String taskNameWithSubtasksAndId = Task.getTaskNameWithSubtaskAndID(this.taskName, this.subtaskIndex, this.parallelism, this.executionId);
        List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
        List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
        this.producedPartitions = new ResultPartition[partitions.size()];
        this.writers = new ResultPartitionWriter[partitions.size()];
        for (i = 0; i < this.producedPartitions.length; ++i) {
            ResultPartitionDeploymentDescriptor desc = partitions.get(i);
            ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), this.executionId);
            this.producedPartitions[i] = new ResultPartition(taskNameWithSubtasksAndId, this.jobId, partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), ioManager, networkEnvironment.getDefaultIOMode());
            this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
        }
        this.inputGates = new SingleInputGate[consumedPartitions.size()];
        this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
        for (i = 0; i < this.inputGates.length; ++i) {
            SingleInputGate gate;
            this.inputGates[i] = gate = SingleInputGate.create(taskNameWithSubtasksAndId, this.jobId, this.executionId, consumedPartitions.get(i), networkEnvironment);
            this.inputGatesById.put(gate.getConsumedResultId(), gate);
        }
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public int getIndexInSubtaskGroup() {
        return this.subtaskIndex;
    }

    public int getNumberOfSubtasks() {
        return this.parallelism;
    }

    public String getTaskName() {
        return this.taskName;
    }

    public String getTaskNameWithSubtasks() {
        return this.taskNameWithSubtask;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public ResultPartitionWriter[] getAllWriters() {
        return this.writers;
    }

    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID id) {
        return this.inputGatesById.get((Object)id);
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block41: {
            block39: {
                block40: {
                    while (true) {
                        ExecutionState current;
                        if ((current = this.executionState) == ExecutionState.CREATED) {
                            if (!STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) continue;
                            break block39;
                        }
                        if (current == ExecutionState.FAILED) {
                            this.notifyFinalState();
                            return;
                        }
                        if (current != ExecutionState.CANCELING) break block40;
                        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) break;
                    }
                    this.notifyFinalState();
                    return;
                }
                throw new IllegalStateException("Invalid state for beginning of task operation");
            }
            HashMap<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>();
            AbstractInvokable invokable = null;
            try {
                LOG.info("Loading JAR files for task " + this.taskNameWithSubtask);
                ClassLoader userCodeClassLoader = this.createUserCodeClassloader(this.libraryCache);
                invokable = this.loadAndInstantiateInvokable(userCodeClassLoader, this.nameOfInvokableClass);
                if (this.isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
                LOG.info("Registering task at network: " + this);
                this.network.registerTask(this);
                try {
                    for (Map.Entry entry : DistributedCache.readFileInfoFromConfig((Configuration)this.jobConfiguration)) {
                        LOG.info("Obtaining local cache file for '" + (String)entry.getKey() + '\'');
                        Future<Path> cp = this.fileCache.createTmpFile((String)entry.getKey(), (DistributedCache.DistributedCacheEntry)entry.getValue(), this.jobId);
                        distributedCacheEntries.put((String)entry.getKey(), cp);
                    }
                }
                catch (Exception e) {
                    throw new Exception("Exception while adding files to distributed cache.", e);
                }
                if (this.isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
                TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(this.jobManager, this.jobId, this.vertexId, this.executionId, userCodeClassLoader, this.actorAskTimeout);
                RuntimeEnvironment env = new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, this.taskName, this.taskNameWithSubtask, this.subtaskIndex, this.parallelism, this.jobConfiguration, this.taskConfiguration, userCodeClassLoader, this.memoryManager, this.ioManager, this.broadcastVariableManager, splitProvider, distributedCacheEntries, this.writers, this.inputGates, this.jobManager);
                invokable.setEnvironment(env);
                try {
                    invokable.registerInputOutput();
                }
                catch (Exception e) {
                    throw new Exception("Call to registerInputOutput() of invokable failed", e);
                }
                SerializedValue<StateHandle<?>> operatorState = this.operatorState;
                if (operatorState != null) {
                    if (invokable instanceof OperatorStateCarrier) {
                        try {
                            StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
                            OperatorStateCarrier op = (OperatorStateCarrier)((Object)invokable);
                            StateUtils.setOperatorState(op, state);
                        }
                        catch (Exception e) {
                            throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
                        }
                    } else {
                        throw new IllegalStateException("Found operator state for a non-stateful task invokable");
                    }
                }
                operatorState = null;
                this.operatorState = null;
                this.invokable = invokable;
                if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                    throw new CancelTaskException();
                }
                this.notifyObservers(ExecutionState.RUNNING, null);
                this.taskManager.tell((Object)new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, ExecutionState.RUNNING)), ActorRef.noSender());
                this.executingThread.setContextClassLoader(userCodeClassLoader);
                invokable.invoke();
                if (this.isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
                for (ResultPartition partition : this.producedPartitions) {
                    if (partition == null) continue;
                    partition.finish();
                }
                if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                    this.notifyObservers(ExecutionState.FINISHED, null);
                    break block41;
                }
                throw new CancelTaskException();
            }
            catch (Throwable t) {
                try {
                    ExecutionState current;
                    while (true) {
                        if ((current = this.executionState) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                            if (t instanceof CancelTaskException) {
                                if (!STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) continue;
                                this.cancelInvokable();
                                this.notifyObservers(ExecutionState.CANCELED, null);
                            } else {
                                if (!STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) continue;
                                this.failureCause = t;
                                this.cancelInvokable();
                                this.notifyObservers(ExecutionState.FAILED, t);
                            }
                            break block41;
                        }
                        if (current == ExecutionState.CANCELING) {
                            if (!STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) continue;
                            this.notifyObservers(ExecutionState.CANCELED, null);
                            break block41;
                        }
                        if (current == ExecutionState.FAILED) {
                            break block41;
                        }
                        if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) break;
                    }
                    LOG.error("Unexpected state in Task during an exception: " + (Object)((Object)current));
                }
                catch (Throwable tt) {
                    String message = "FATAL - exception in task exception handler";
                    LOG.error(message, tt);
                    this.notifyFatalError(message, tt);
                }
            }
            finally {
                try {
                    LOG.info("Freeing task resources for " + this.taskNameWithSubtask);
                    this.network.unregisterTask(this);
                    if (invokable != null) {
                        this.memoryManager.releaseAll(invokable);
                    }
                    this.libraryCache.unregisterTask(this.jobId, this.executionId);
                    this.removeCachedFiles(distributedCacheEntries, this.fileCache);
                    this.notifyFinalState();
                }
                catch (Throwable t) {
                    String message = "FATAL - exception in task resource cleanup";
                    LOG.error(message, t);
                    this.notifyFatalError(message, t);
                }
            }
        }
    }

    private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCache) throws Exception {
        long startDownloadTime = System.currentTimeMillis();
        libraryCache.registerTask(this.jobId, this.executionId, this.requiredJarFiles);
        LOG.debug("Register task {} at library cache manager took {} milliseconds", (Object)this.executionId, (Object)(System.currentTimeMillis() - startDownloadTime));
        ClassLoader userCodeClassLoader = libraryCache.getClassLoader(this.jobId);
        if (userCodeClassLoader == null) {
            throw new Exception("No user code classloader available.");
        }
        return userCodeClassLoader;
    }

    private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
        Class<AbstractInvokable> invokableClass;
        try {
            invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
        }
        catch (Throwable t) {
            throw new Exception("Could not load the task's invokable class.", t);
        }
        try {
            return invokableClass.newInstance();
        }
        catch (Throwable t) {
            throw new Exception("Could not instantiate the task's invokable class.", t);
        }
    }

    private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
        try {
            for (Map.Entry<String, Future<Path>> entry : entries.entrySet()) {
                String name = entry.getKey();
                try {
                    fileCache.deleteTmpFile(name, this.jobId);
                }
                catch (Exception e) {
                    LOG.error("Distributed Cache could not remove cached file registered under '" + name + "'.", (Throwable)e);
                }
            }
        }
        catch (Throwable t) {
            LOG.error("Error while removing cached local files from distributed cache.");
        }
    }

    private void notifyFinalState() {
        this.taskManager.tell((Object)new TaskMessages.TaskInFinalState(this.executionId), ActorRef.noSender());
    }

    private void notifyFatalError(String message, Throwable cause) {
        this.taskManager.tell((Object)new TaskManagerMessages.FatalError(message, cause), ActorRef.noSender());
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task " + this.taskNameWithSubtask);
        this.cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    public void failExternally(Throwable cause) {
        LOG.info("Attempting to fail task externally " + this.taskNameWithSubtask);
        this.cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
    }

    private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
        ExecutionState current;
        block4: {
            while (true) {
                if ((current = this.executionState).isTerminal() || current == ExecutionState.CANCELING) {
                    LOG.info("Task " + this.taskNameWithSubtask + " is already in state " + (Object)((Object)current));
                    return;
                }
                if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
                    if (!STATE_UPDATER.compareAndSet(this, current, targetState)) continue;
                    this.failureCause = cause;
                    this.notifyObservers(targetState, cause);
                    return;
                }
                if (current != ExecutionState.RUNNING) break block4;
                if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) break;
            }
            if (this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
                this.failureCause = cause;
                this.notifyObservers(targetState, cause);
                LOG.info("Triggering cancellation of task code {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
                TaskCanceler canceler = new TaskCanceler(LOG, this.invokable, this.executingThread, this.taskNameWithSubtask);
                Thread cancelThread = new Thread(this.executingThread.getThreadGroup(), canceler, "Canceler for " + this.taskNameWithSubtask);
                cancelThread.start();
            }
            return;
        }
        throw new IllegalStateException("Unexpected task state: " + (Object)((Object)current));
    }

    public void registerExecutionListener(ActorRef listener) {
        this.executionListenerActors.add(listener);
    }

    public void unregisterExecutionListener(ActorRef listener) {
        this.executionListenerActors.remove(listener);
    }

    private void notifyObservers(ExecutionState newState, Throwable error) {
        if (error == null) {
            LOG.info(this.taskNameWithSubtask + " switched to " + (Object)((Object)newState));
        } else {
            LOG.info(this.taskNameWithSubtask + " switched to " + (Object)((Object)newState) + " with exception.", error);
        }
        TaskExecutionState stateUpdate = new TaskExecutionState(this.jobId, this.executionId, newState, error);
        TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(stateUpdate);
        for (ActorRef listener : this.executionListenerActors) {
            listener.tell((Object)actorMessage, ActorRef.noSender());
        }
    }

    public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
        AbstractInvokable invokable = this.invokable;
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof CheckpointedOperator) {
                final CheckpointedOperator checkpointer = (CheckpointedOperator)((Object)invokable);
                final Logger logger = LOG;
                final String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
                        }
                        catch (Throwable t) {
                            logger.error("Error while triggering checkpoint for " + taskName, t);
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
            } else {
                LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + this.taskNameWithSubtask);
            }
        } else {
            LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
        }
    }

    public void confirmCheckpoint(final long checkpointID, final long checkpointTimestamp) {
        AbstractInvokable invokable = this.invokable;
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof CheckpointCommittingOperator) {
                final CheckpointCommittingOperator checkpointer = (CheckpointCommittingOperator)((Object)invokable);
                final Logger logger = LOG;
                final String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            checkpointer.confirmCheckpoint(checkpointID, checkpointTimestamp);
                        }
                        catch (Throwable t) {
                            logger.error("Error while confirming checkpoint for " + taskName, t);
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, "Checkpoint Confirmation");
            } else {
                LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " + this.taskNameWithSubtask);
            }
        } else {
            LOG.debug("Ignoring checkpoint commit notification for non-running task.");
        }
    }

    public void onPartitionStateUpdate(IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ExecutionState partitionState) throws IOException, InterruptedException {
        if (this.executionState == ExecutionState.RUNNING) {
            SingleInputGate inputGate = this.inputGatesById.get((Object)resultId);
            if (inputGate != null) {
                if (partitionState == ExecutionState.RUNNING) {
                    inputGate.retriggerPartitionRequest(partitionId);
                } else if (partitionState == ExecutionState.CANCELED || partitionState == ExecutionState.CANCELING || partitionState == ExecutionState.FAILED) {
                    this.cancelExecution();
                } else {
                    this.failExternally(new IllegalStateException("Received unexpected partition state " + (Object)((Object)partitionState) + " for partition request. This is a bug."));
                }
            } else {
                this.failExternally(new IllegalStateException("Received partition state for unknown input gate " + (Object)((Object)resultId) + ". This is a bug."));
            }
        } else {
            LOG.debug("Ignoring partition state notification for not running task.");
        }
    }

    private void executeAsyncCallRunnable(Runnable runnable, String callName) {
        Thread thread = new Thread(runnable, callName);
        thread.setDaemon(true);
        thread.start();
    }

    private void cancelInvokable() {
        if (this.invokable != null && this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            try {
                this.invokable.cancel();
            }
            catch (Throwable t) {
                LOG.error("Error while canceling task " + this.taskNameWithSubtask, t);
            }
        }
    }

    public String toString() {
        return this.getTaskNameWithSubtasks() + " [" + (Object)((Object)this.executionState) + ']';
    }

    public static String getTaskNameWithSubtask(String name, int subtask, int numSubtasks) {
        return name + " (" + (subtask + 1) + '/' + numSubtasks + ')';
    }

    public static String getTaskNameWithSubtaskAndID(String name, int subtask, int numSubtasks, ExecutionAttemptID id) {
        return name + " (" + (subtask + 1) + '/' + numSubtasks + ") (" + (Object)((Object)id) + ')';
    }

    private static class TaskCanceler
    implements Runnable {
        private final Logger logger;
        private final AbstractInvokable invokable;
        private final Thread executer;
        private final String taskName;

        public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName) {
            this.logger = logger;
            this.invokable = invokable;
            this.executer = executer;
            this.taskName = taskName;
        }

        @Override
        public void run() {
            try {
                try {
                    this.invokable.cancel();
                }
                catch (Throwable t) {
                    this.logger.error("Error while canceling the task", t);
                }
                this.executer.interrupt();
                try {
                    this.executer.join(10000L);
                }
                catch (InterruptedException t) {
                    // empty catch block
                }
                while (this.executer.isAlive()) {
                    StackTraceElement[] stack;
                    StringBuilder bld = new StringBuilder();
                    for (StackTraceElement e : stack = this.executer.getStackTrace()) {
                        bld.append(e).append('\n');
                    }
                    this.logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", (Object)this.taskName, (Object)bld.toString());
                    this.executer.interrupt();
                    try {
                        this.executer.join(5000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
            catch (Throwable t) {
                this.logger.error("Error in the task canceler", t);
            }
        }
    }
}

