/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.management.ThreadInfo;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.BackPressureSampleService;
import org.apache.flink.runtime.taskexecutor.EstablishedResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;

public class TaskExecutor
extends RpcEndpoint
implements TaskExecutorGateway {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final HighAvailabilityServices haServices;
    private final TaskManagerServices taskExecutorServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final FatalErrorHandler fatalErrorHandler;
    private final BlobCacheService blobCacheService;
    private final LibraryCacheManager libraryCacheManager;
    @Nullable
    private final String metricQueryServiceAddress;
    private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;
    private final ExternalResourceInfoProvider externalResourceInfoProvider;
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;
    private final KvStateService kvStateService;
    private final Executor ioExecutor;
    private final TaskSlotTable<Task> taskSlotTable;
    private final JobTable jobTable;
    private final JobLeaderService jobLeaderService;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final HardwareDescription hardwareDescription;
    private FileCache fileCache;
    private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
    private final TaskExecutorPartitionTracker partitionTracker;
    private final BackPressureSampleService backPressureSampleService;
    @Nullable
    private ResourceManagerAddress resourceManagerAddress;
    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
    @Nullable
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;
    @Nullable
    private UUID currentRegistrationTimeoutId;
    private Map<JobID, Collection<CompletableFuture<ExecutionState>>> taskResultPartitionCleanupFuturesPerJob = new HashMap<JobID, Collection<CompletableFuture<ExecutionState>>>(8);

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices, TaskManagerServices taskExecutorServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker partitionTracker, BackPressureSampleService backPressureSampleService) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        Preconditions.checkArgument((taskManagerConfiguration.getNumberSlots() > 0 ? 1 : 0) != 0, (Object)"The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration)Preconditions.checkNotNull((Object)taskManagerConfiguration);
        this.taskExecutorServices = (TaskManagerServices)Preconditions.checkNotNull((Object)taskExecutorServices);
        this.haServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)haServices);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.partitionTracker = partitionTracker;
        this.taskManagerMetricGroup = (TaskManagerMetricGroup)Preconditions.checkNotNull((Object)taskManagerMetricGroup);
        this.blobCacheService = (BlobCacheService)Preconditions.checkNotNull((Object)blobCacheService);
        this.metricQueryServiceAddress = metricQueryServiceAddress;
        this.backPressureSampleService = (BackPressureSampleService)Preconditions.checkNotNull((Object)backPressureSampleService);
        this.externalResourceInfoProvider = (ExternalResourceInfoProvider)Preconditions.checkNotNull((Object)externalResourceInfoProvider);
        this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
        this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
        this.jobTable = taskExecutorServices.getJobTable();
        this.jobLeaderService = taskExecutorServices.getJobLeaderService();
        this.unresolvedTaskManagerLocation = taskExecutorServices.getUnresolvedTaskManagerLocation();
        this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
        this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
        this.kvStateService = taskExecutorServices.getKvStateService();
        this.ioExecutor = taskExecutorServices.getIOExecutor();
        this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
        this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
        this.resourceManagerAddress = null;
        this.resourceManagerConnection = null;
        this.currentRegistrationTimeoutId = null;
        ResourceID resourceId = taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();
        this.jobManagerHeartbeatManager = this.createJobManagerHeartbeatManager(heartbeatServices, resourceId);
        this.resourceManagerHeartbeatManager = this.createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
    }

    private HeartbeatManager<Void, TaskExecutorHeartbeatPayload> createResourceManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceId) {
        return heartbeatServices.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), this.getMainThreadExecutor(), this.log);
    }

    private HeartbeatManager<AllocatedSlotReport, AccumulatorReport> createJobManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceId) {
        return heartbeatServices.createHeartbeatManager(resourceId, new JobManagerHeartbeatListener(), this.getMainThreadExecutor(), this.log);
    }

    @Override
    public CompletableFuture<Boolean> canBeReleased() {
        return CompletableFuture.completedFuture(this.shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty());
    }

    @Override
    public CompletableFuture<Collection<LogInfo>> requestLogList(Time timeout) {
        return CompletableFuture.supplyAsync(() -> {
            String logDir = this.taskManagerConfiguration.getTaskManagerLogDir();
            if (logDir != null) {
                File[] logFiles = new File(logDir).listFiles();
                if (logFiles == null) {
                    throw new CompletionException(new FlinkException(String.format("There isn't a log file in TaskExecutor\u2019s log dir %s.", logDir)));
                }
                return Arrays.stream(logFiles).filter(File::isFile).map(logFile -> new LogInfo(logFile.getName(), logFile.length())).collect(Collectors.toList());
            }
            return Collections.emptyList();
        }, this.ioExecutor);
    }

    @Override
    public void onStart() throws Exception {
        try {
            this.startTaskExecutorServices();
        }
        catch (Throwable t) {
            TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", this.getAddress()), t);
            this.onFatalError(exception);
            throw exception;
        }
        this.startRegistrationTimeout();
    }

    private void startTaskExecutorServices() throws Exception {
        try {
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            this.taskSlotTable.start(new SlotActionsImpl(), this.getMainThreadExecutor());
            this.jobLeaderService.start(this.getAddress(), this.getRpcService(), this.haServices, new JobLeaderListenerImpl());
            this.fileCache = new FileCache(this.taskManagerConfiguration.getTmpDirectories(), this.blobCacheService.getPermanentBlobService());
        }
        catch (Exception e) {
            this.handleStartTaskExecutorServicesException(e);
        }
    }

    private void handleStartTaskExecutorServicesException(Exception e) throws Exception {
        try {
            this.stopTaskExecutorServices();
        }
        catch (Exception inner) {
            e.addSuppressed(inner);
        }
        throw e;
    }

    @Override
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping TaskExecutor {}.", (Object)this.getAddress());
        Throwable jobManagerDisconnectThrowable = null;
        FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
        this.closeResourceManagerConnection((Exception)((Object)cause));
        for (JobTable.Job job : this.jobTable.getJobs()) {
            try {
                this.closeJob(job, (Exception)((Object)cause));
            }
            catch (Throwable t) {
                jobManagerDisconnectThrowable = ExceptionUtils.firstOrSuppressed((Throwable)t, jobManagerDisconnectThrowable);
            }
        }
        Preconditions.checkState((boolean)this.jobTable.isEmpty());
        Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;
        return FutureUtils.runAfterwards(this.taskSlotTable.closeAsync(), this::stopTaskExecutorServices).handle((ignored, throwable) -> {
            this.handleOnStopException(throwableBeforeTasksCompletion, (Throwable)throwable);
            return null;
        });
    }

    private void handleOnStopException(Throwable throwableBeforeTasksCompletion, Throwable throwableAfterTasksCompletion) {
        Throwable throwable = throwableBeforeTasksCompletion != null ? ExceptionUtils.firstOrSuppressed((Throwable)throwableBeforeTasksCompletion, (Throwable)throwableAfterTasksCompletion) : throwableAfterTasksCompletion;
        if (throwable != null) {
            throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
        }
        this.log.info("Stopped TaskExecutor {}.", (Object)this.getAddress());
    }

    private void stopTaskExecutorServices() throws Exception {
        Exception exception = null;
        try {
            this.jobLeaderService.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        try {
            this.resourceManagerLeaderRetriever.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.taskExecutorServices.shutDown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.fileCache.shutdown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        this.taskManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    @Override
    public CompletableFuture<TaskBackPressureResponse> requestTaskBackPressure(ExecutionAttemptID executionAttemptId, int requestId, @RpcTimeout Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptId);
        if (task == null) {
            return FutureUtils.completedExceptionally(new IllegalStateException(String.format("Cannot request back pressure of task %s. Task is not known to the task manager.", new Object[]{executionAttemptId})));
        }
        CompletableFuture<Double> backPressureRatioFuture = this.backPressureSampleService.sampleTaskBackPressure(task);
        return backPressureRatioFuture.thenApply(backPressureRatio -> new TaskBackPressureResponse(requestId, executionAttemptId, (double)backPressureRatio));
    }

    @Override
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        try {
            boolean taskAdded;
            MemoryManager memoryManager;
            TaskInformation taskInformation;
            JobInformation jobInformation;
            JobID jobId = tdd.getJobId();
            ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
            JobTable.Connection jobManagerConnection = this.jobTable.getConnection(jobId).orElseThrow(() -> {
                String message = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(message);
                return new TaskSubmissionException(message);
            });
            if (!Objects.equals((Object)jobManagerConnection.getJobMasterId(), (Object)jobMasterId)) {
                String message = "Rejecting the task submission because the job manager leader id " + (Object)((Object)jobMasterId) + " does not match the expected job manager leader id " + (Object)((Object)jobManagerConnection.getJobMasterId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            if (!this.taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
                String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + (Object)((Object)tdd.getAllocationId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            try {
                tdd.loadBigData(this.blobCacheService.getPermanentBlobService());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
            }
            try {
                jobInformation = (JobInformation)tdd.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
                taskInformation = (TaskInformation)tdd.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
            }
            if (!jobId.equals((Object)jobInformation.getJobId())) {
                throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor (" + tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
            }
            TaskMetricGroup taskMetricGroup = this.taskManagerMetricGroup.addTaskForJob(jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskInformation.getTaskName(), tdd.getSubtaskIndex(), tdd.getAttemptNumber());
            RpcInputSplitProvider inputSplitProvider = new RpcInputSplitProvider(jobManagerConnection.getJobManagerGateway(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout());
            RpcTaskOperatorEventGateway taskOperatorEventGateway = new RpcTaskOperatorEventGateway(jobManagerConnection.getJobManagerGateway(), executionAttemptID, t -> this.runAsync(() -> this.failTask(executionAttemptID, (Throwable)t)));
            TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
            CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
            GlobalAggregateManager aggregateManager = jobManagerConnection.getGlobalAggregateManager();
            LibraryCacheManager.ClassLoaderHandle classLoaderHandle = jobManagerConnection.getClassLoaderHandle();
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
            PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
            TaskLocalStateStore localStateStore = this.localStateStoresManager.localStateStoreForSubtask(jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex());
            JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
            TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(jobId, tdd.getExecutionAttemptId(), localStateStore, taskRestore, checkpointResponder);
            try {
                memoryManager = this.taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
            }
            catch (SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }
            Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), memoryManager, this.taskExecutorServices.getIOManager(), this.taskExecutorServices.getShuffleEnvironment(), this.taskExecutorServices.getKvStateService(), this.taskExecutorServices.getBroadcastVariableManager(), this.taskExecutorServices.getTaskEventDispatcher(), this.externalResourceInfoProvider, taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, taskOperatorEventGateway, aggregateManager, classLoaderHandle, this.fileCache, this.taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, this.getRpcService().getExecutor());
            taskMetricGroup.gauge("isBackPressured", task::isBackPressured);
            this.log.info("Received task {}.", (Object)task.getTaskInfo().getTaskNameWithSubtasks());
            try {
                taskAdded = this.taskSlotTable.addTask(task);
            }
            catch (SlotNotActiveException | SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }
            if (taskAdded) {
                task.startTaskThread();
                this.setupResultPartitionBookkeeping(tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            String message = "TaskManager already contains a task for id " + (Object)((Object)task.getExecutionId()) + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private void setupResultPartitionBookkeeping(JobID jobId, Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions, CompletableFuture<ExecutionState> terminationFuture) {
        Set partitionsRequiringRelease = this.filterPartitionsRequiringRelease(producedResultPartitions).peek(rpdd -> this.partitionTracker.startTrackingPartition(jobId, TaskExecutorPartitionInfo.from(rpdd))).map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).map(ShuffleDescriptor::getResultPartitionID).collect(Collectors.toSet());
        CompletionStage taskTerminationWithResourceCleanupFuture = terminationFuture.thenApplyAsync(executionState -> {
            if (executionState != ExecutionState.FINISHED) {
                this.partitionTracker.stopTrackingPartitions(partitionsRequiringRelease);
            }
            return executionState;
        }, (Executor)this.getMainThreadExecutor());
        this.taskResultPartitionCleanupFuturesPerJob.compute(jobId, (arg_0, arg_1) -> TaskExecutor.lambda$setupResultPartitionBookkeeping$9((CompletableFuture)taskTerminationWithResourceCleanupFuture, arg_0, arg_1));
    }

    private Stream<ResultPartitionDeploymentDescriptor> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions) {
        return producedResultPartitions.stream().filter(d -> d.getPartitionType().isBlocking()).filter(d -> d.getShuffleDescriptor().storesLocalResourcesOn().isPresent());
    }

    @Override
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.cancelExecution();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                return FutureUtils.completedExceptionally(new TaskException("Cannot cancel task for execution " + (Object)((Object)executionAttemptID) + '.', t));
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new TaskException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            for (PartitionInfo partitionInfo : partitionInfos) {
                FutureUtils.assertNoException(CompletableFuture.runAsync(() -> {
                    try {
                        if (!this.shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
                            this.log.debug("Discard update for input gate partition {} of result {} in task {}. The partition is no longer available.", new Object[]{partitionInfo.getShuffleDescriptor().getResultPartitionID(), partitionInfo.getIntermediateDataSetID(), executionAttemptID});
                        }
                    }
                    catch (IOException | InterruptedException e) {
                        this.log.error("Could not update input data location for task {}. Trying to fail task.", (Object)task.getTaskInfo().getTaskName(), (Object)e);
                        task.failExternally(e);
                    }
                }, this.getRpcService().getExecutor()));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", (Object)executionAttemptID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public void releaseOrPromotePartitions(JobID jobId, Set<ResultPartitionID> partitionToRelease, Set<ResultPartitionID> partitionsToPromote) {
        try {
            this.partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease);
            this.partitionTracker.promoteJobPartitions(partitionsToPromote);
            this.closeJobManagerConnectionIfNoAllocatedResources(jobId);
        }
        catch (Throwable t) {
            this.onFatalError(t);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> releaseClusterPartitions(Collection<IntermediateDataSetID> dataSetsToRelease, Time timeout) {
        this.partitionTracker.stopTrackingAndReleaseClusterPartitions(dataSetsToRelease);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public void heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (!(checkpointType.getPostCheckpointAction() != CheckpointType.PostCheckpointAction.TERMINATE || checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint request for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }

    @Override
    public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        this.log.debug("Confirm checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(checkpointId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint confirmation for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
    }

    @Override
    public CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        this.log.debug("Abort checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointAborted(checkpointId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received an aborted checkpoint for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
    }

    @Override
    public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
        JobTable.Job job;
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationId, jobId, resourceManagerId});
        if (!this.isConnectedToResourceManager(resourceManagerId)) {
            String message = String.format("TaskManager is not connected to the resource manager %s.", new Object[]{resourceManagerId});
            this.log.debug(message);
            return FutureUtils.completedExceptionally(new TaskManagerException(message));
        }
        try {
            this.allocateSlot(slotId, jobId, allocationId, resourceProfile);
        }
        catch (SlotAllocationException sae) {
            return FutureUtils.completedExceptionally(sae);
        }
        try {
            job = this.jobTable.getOrCreateJob(jobId, () -> this.registerNewJobAndCreateServices(jobId, targetAddress));
        }
        catch (Exception e) {
            try {
                this.taskSlotTable.freeSlot(allocationId);
            }
            catch (SlotNotFoundException slotNotFoundException) {
                this.onFatalError(slotNotFoundException);
            }
            this.localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
            if (!this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                this.onFatalError(new Exception("Could not free slot " + slotId));
            }
            return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
        }
        if (job.isConnected()) {
            this.offerSlotsToJobManager(jobId);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private TaskExecutorJobServices registerNewJobAndCreateServices(JobID jobId, String targetAddress) throws Exception {
        this.jobLeaderService.addJob(jobId, targetAddress);
        PermanentBlobCache permanentBlobService = this.blobCacheService.getPermanentBlobService();
        permanentBlobService.registerJob(jobId);
        return TaskExecutorJobServices.create(this.libraryCacheManager.registerClassLoaderLease(jobId), () -> permanentBlobService.releaseJob(jobId));
    }

    /*
     * Enabled aggressive block sorting
     */
    private void allocateSlot(SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) throws SlotAllocationException {
        if (!this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
            if (this.taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) return;
            String message = "The slot " + slotId + " has already been allocated for a different job.";
            this.log.info(message);
            AllocationID allocationID = this.taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
            throw new SlotOccupiedException(message, allocationID, this.taskSlotTable.getOwningJob(allocationID));
        }
        if (this.taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, this.taskManagerConfiguration.getTimeout())) {
            this.log.info("Allocated slot for {}.", (Object)allocationId);
            return;
        }
        this.log.info("Could not allocate slot for {}.", (Object)allocationId);
        throw new SlotAllocationException("Could not allocate slot.");
    }

    @Override
    public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
        this.freeSlotInternal(allocationId, cause);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUploadByType(FileType fileType, Time timeout) {
        String filePath;
        switch (fileType) {
            case LOG: {
                filePath = this.taskManagerConfiguration.getTaskManagerLogPath();
                break;
            }
            case STDOUT: {
                filePath = this.taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            }
            default: {
                filePath = null;
            }
        }
        return this.requestFileUploadByFilePath(filePath, fileType.toString());
    }

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUploadByName(String fileName, Time timeout) {
        String logDir = this.taskManagerConfiguration.getTaskManagerLogDir();
        String filePath = StringUtils.isNullOrWhitespaceOnly((String)logDir) || StringUtils.isNullOrWhitespaceOnly((String)fileName) ? null : new File(logDir, new File(fileName).getName()).getPath();
        return this.requestFileUploadByFilePath(filePath, fileName);
    }

    @Override
    public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
        return CompletableFuture.completedFuture(SerializableOptional.ofNullable((Serializable)((Object)this.metricQueryServiceAddress)));
    }

    @Override
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.jobTable.getConnection(jobId).ifPresent(jobManagerConnection -> this.disconnectAndTryReconnectToJobManager((JobTable.Connection)jobManagerConnection, cause));
    }

    private void disconnectAndTryReconnectToJobManager(JobTable.Connection jobManagerConnection, Exception cause) {
        this.disconnectJobManagerConnection(jobManagerConnection, cause);
        this.jobLeaderService.reconnect(jobManagerConnection.getJobId());
    }

    @Override
    public void disconnectResourceManager(Exception cause) {
        if (this.isRunning()) {
            this.reconnectToResourceManager(cause);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorId, SerializedValue<OperatorEvent> evt) {
        this.log.debug("Operator event for {} - {}", (Object)executionAttemptID, (Object)operatorId);
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new TaskNotRunningException("Task " + executionAttemptID.toHexString() + " not running on TaskManager")));
        }
        try {
            task.deliverOperatorEvent(operatorId, evt);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalError((Throwable)t);
            return FutureUtils.completedExceptionally(t);
        }
    }

    @Override
    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
        Collection<ThreadInfo> threadDump = JvmUtils.createThreadDump();
        Collection threadInfos = threadDump.stream().map(threadInfo -> ThreadDumpInfo.ThreadInfo.create(threadInfo.getThreadName(), threadInfo.toString())).collect(Collectors.toList());
        return CompletableFuture.completedFuture(ThreadDumpInfo.create(threadInfos));
    }

    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        this.resourceManagerAddress = this.createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
        this.reconnectToResourceManager((Exception)((Object)new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress))));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String newLeaderAddress, @Nullable ResourceManagerId newResourceManagerId) {
        if (newLeaderAddress == null) {
            return null;
        }
        assert (newResourceManagerId != null);
        return new ResourceManagerAddress(newLeaderAddress, newResourceManagerId);
    }

    private void reconnectToResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
        this.startRegistrationTimeout();
        this.tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            this.connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        assert (this.resourceManagerAddress != null);
        assert (this.establishedResourceManagerConnection == null);
        assert (this.resourceManagerConnection == null);
        this.log.info("Connecting to ResourceManager {}.", (Object)this.resourceManagerAddress);
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(this.getAddress(), this.getResourceID(), this.unresolvedTaskManagerLocation.getDataPort(), this.hardwareDescription, this.taskManagerConfiguration.getDefaultSlotResourceProfile(), this.taskManagerConfiguration.getTotalResourceProfile());
        this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, this.getRpcService(), this.taskManagerConfiguration.getRetryingRegistrationConfiguration(), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.getMainThreadExecutor(), new ResourceManagerRegistrationListener(), taskExecutorRegistration);
        this.resourceManagerConnection.start();
    }

    private void establishResourceManagerConnection(final ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) {
        CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(this.getResourceID(), taskExecutorRegistrationId, this.taskSlotTable.createSlotReport(this.getResourceID()), this.taskManagerConfiguration.getTimeout());
        slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                this.reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", (Throwable)throwable));
            }
        }, (Executor)this.getMainThreadExecutor());
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
                resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
            }
        });
        InetSocketAddress blobServerAddress = new InetSocketAddress(clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort());
        this.blobCacheService.setBlobServerAddress(blobServerAddress);
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceManagerResourceId, taskExecutorRegistrationId);
        this.stopRegistrationTimeout();
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.establishedResourceManagerConnection != null) {
            ResourceID resourceManagerResourceId = this.establishedResourceManagerConnection.getResourceManagerResourceId();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Close ResourceManager connection {}.", (Object)resourceManagerResourceId, (Object)cause);
            } else {
                this.log.info("Close ResourceManager connection {}.", (Object)resourceManagerResourceId);
            }
            this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
            ResourceManagerGateway resourceManagerGateway = this.establishedResourceManagerConnection.getResourceManagerGateway();
            resourceManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
            this.establishedResourceManagerConnection = null;
            this.partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
        }
        if (this.resourceManagerConnection != null) {
            if (!this.resourceManagerConnection.isConnected()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Terminating registration attempts towards ResourceManager {}.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)cause);
                } else {
                    this.log.info("Terminating registration attempts towards ResourceManager {}.", (Object)this.resourceManagerConnection.getTargetAddress());
                }
            }
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void startRegistrationTimeout() {
        Time maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
        if (maxRegistrationDuration != null) {
            UUID newRegistrationTimeoutId;
            this.currentRegistrationTimeoutId = newRegistrationTimeoutId = UUID.randomUUID();
            this.scheduleRunAsync(() -> this.registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
        }
    }

    private void stopRegistrationTimeout() {
        this.currentRegistrationTimeoutId = null;
    }

    private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
        if (registrationTimeoutId.equals(this.currentRegistrationTimeoutId)) {
            Time maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
            this.onFatalError(new RegistrationTimeoutException(String.format("Could not register at the ResourceManager within the specified maximum registration duration %s. This indicates a problem with this instance. Terminating now.", maxRegistrationDuration)));
        }
    }

    private void offerSlotsToJobManager(JobID jobId) {
        this.jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
    }

    private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
        JobID jobId = jobManagerConnection.getJobId();
        if (this.taskSlotTable.hasAllocatedSlots(jobId)) {
            this.log.info("Offer reserved slots to the leader of job {}.", (Object)jobId);
            JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
            Iterator<TaskSlot<Task>> reservedSlotsIterator = this.taskSlotTable.getAllocatedSlots(jobId);
            JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
            HashSet<SlotOffer> reservedSlots = new HashSet<SlotOffer>(2);
            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
                reservedSlots.add(offer);
            }
            CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(this.getResourceID(), reservedSlots, this.taskManagerConfiguration.getTimeout());
            acceptedSlotsFuture.whenCompleteAsync(this.handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots), (Executor)this.getMainThreadExecutor());
        } else {
            this.log.debug("There are no unassigned slots for the job {}.", (Object)jobId);
        }
    }

    @Nonnull
    private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobId, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> offeredSlots) {
        return (acceptedSlots, throwable) -> {
            if (throwable != null) {
                if (throwable instanceof TimeoutException) {
                    this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                    this.offerSlotsToJobManager(jobId);
                } else {
                    this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", throwable);
                    for (SlotOffer reservedSlot : offeredSlots) {
                        this.freeSlotInternal(reservedSlot.getAllocationId(), (Throwable)throwable);
                    }
                }
            } else if (this.isJobManagerConnectionValid(jobId, jobMasterId)) {
                for (SlotOffer acceptedSlot : acceptedSlots) {
                    try {
                        if (!this.taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
                            String message = "Could not mark slot " + jobId + " active.";
                            this.log.debug(message);
                            jobMasterGateway.failSlot(this.getResourceID(), acceptedSlot.getAllocationId(), (Exception)((Object)new FlinkException(message)));
                        }
                    }
                    catch (SlotNotFoundException e) {
                        String message = "Could not mark slot " + jobId + " active.";
                        jobMasterGateway.failSlot(this.getResourceID(), acceptedSlot.getAllocationId(), (Exception)((Object)new FlinkException(message)));
                    }
                    offeredSlots.remove(acceptedSlot);
                }
                Exception e = new Exception("The slot was rejected by the JobManager.");
                for (SlotOffer rejectedSlot : offeredSlots) {
                    this.freeSlotInternal(rejectedSlot.getAllocationId(), e);
                }
            } else {
                this.log.debug("Discard offer slot response since there is a new leader for the job {}.", (Object)jobId);
            }
        };
    }

    private void establishJobManagerConnection(JobTable.Job job, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
        JobID jobId = job.getJobId();
        Optional<JobTable.Connection> connection = job.asConnection();
        if (connection.isPresent()) {
            JobTable.Connection oldJobManagerConnection = connection.get();
            if (Objects.equals((Object)oldJobManagerConnection.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
                return;
            }
            this.disconnectJobManagerConnection(oldJobManagerConnection, new Exception("Found new job leader for job id " + jobId + '.'));
        }
        this.log.info("Establish JobManager connection for job {}.", (Object)jobId);
        ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
        JobTable.Connection establishedConnection = this.associateWithJobManager(job, jobManagerResourceID, jobMasterGateway);
        this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
            }
        });
        this.internalOfferSlotsToJobManager(establishedConnection);
    }

    private void closeJob(JobTable.Job job, Exception cause) {
        job.asConnection().ifPresent(jobManagerConnection -> this.disconnectJobManagerConnection((JobTable.Connection)jobManagerConnection, cause));
        job.close();
    }

    private void disconnectJobManagerConnection(JobTable.Connection jobManagerConnection, Exception cause) {
        JobID jobId = jobManagerConnection.getJobId();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close JobManager connection for job {}.", (Object)jobId, (Object)cause);
        } else {
            this.log.info("Close JobManager connection for job {}.", (Object)jobId);
        }
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobId);
        FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId + " lost the leadership.", (Throwable)cause);
        while (tasks.hasNext()) {
            tasks.next().failExternally(failureCause);
        }
        Set<AllocationID> activeSlotAllocationIDs = this.taskSlotTable.getActiveTaskAllocationIdsPerJob(jobId);
        FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
        for (AllocationID activeSlotAllocationID : activeSlotAllocationIDs) {
            try {
                if (this.taskSlotTable.markSlotInactive(activeSlotAllocationID, this.taskManagerConfiguration.getTimeout())) continue;
                this.freeSlotInternal(activeSlotAllocationID, freeingCause);
            }
            catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", (Object)jobId, (Object)e);
            }
        }
        try {
            this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceId());
            this.disassociateFromJobManager(jobManagerConnection, cause);
        }
        catch (IOException e) {
            this.log.warn("Could not properly disassociate from JobManager {}.", (Object)jobManagerConnection.getJobManagerGateway().getAddress(), (Object)e);
        }
        jobManagerConnection.disconnect();
    }

    private JobTable.Connection associateWithJobManager(JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) {
        Preconditions.checkNotNull((Object)resourceID);
        Preconditions.checkNotNull((Object)jobMasterGateway);
        TaskManagerActionsImpl taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
        RpcCheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        RpcGlobalAggregateManager aggregateManager = new RpcGlobalAggregateManager(jobMasterGateway);
        RpcResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(jobMasterGateway, this.getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout());
        RpcPartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
        this.registerQueryableState(job.getJobId(), jobMasterGateway);
        return job.connect(resourceID, jobMasterGateway, taskManagerActions, checkpointResponder, aggregateManager, resultPartitionConsumableNotifier, partitionStateChecker);
    }

    private void disassociateFromJobManager(JobTable.Connection jobManagerConnection, Exception cause) throws IOException {
        KvStateClientProxy kvStateClientProxy;
        Preconditions.checkNotNull((Object)jobManagerConnection);
        JobID jobId = jobManagerConnection.getJobId();
        this.scheduleResultPartitionCleanup(jobId);
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateRegistry != null) {
            kvStateRegistry.unregisterListener(jobId);
        }
        if ((kvStateClientProxy = this.kvStateService.getKvStateClientProxy()) != null) {
            kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobId(), null);
        }
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        jobManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
    }

    private void handleRejectedJobManagerConnection(JobID jobId, String targetAddress, JMTMRegistrationRejection rejection) {
        this.log.info("The JobManager under {} rejected the registration for job {}: {}. Releasing all job related resources.", new Object[]{targetAddress, jobId, rejection.getReason()});
        this.releaseJobResources(jobId, (Exception)((Object)new FlinkException(String.format("JobManager %s has rejected the registration.", jobId))));
    }

    private void releaseJobResources(JobID jobId, Exception cause) {
        Set<AllocationID> allocationIds;
        this.log.debug("Releasing job resources for job {}.", (Object)jobId, (Object)cause);
        if (this.partitionTracker.isTrackingPartitionsFor(jobId)) {
            this.partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId);
        }
        if (!(allocationIds = this.taskSlotTable.getAllocationIdsPerJob(jobId)).isEmpty()) {
            for (AllocationID allocationId : allocationIds) {
                this.freeSlotInternal(allocationId, cause);
            }
        }
        this.jobLeaderService.removeJob(jobId);
        this.jobTable.getJob(jobId).ifPresent(job -> this.closeJob((JobTable.Job)job, cause));
    }

    private void scheduleResultPartitionCleanup(JobID jobId) {
        Collection<CompletableFuture<ExecutionState>> taskTerminationFutures = this.taskResultPartitionCleanupFuturesPerJob.remove(jobId);
        if (taskTerminationFutures != null) {
            FutureUtils.waitForAll(taskTerminationFutures).thenRunAsync(() -> this.partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId), this.getMainThreadExecutor());
        }
    }

    private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) {
        KvStateClientProxy kvStateProxy;
        KvStateServer kvStateServer = this.kvStateService.getKvStateServer();
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateServer != null && kvStateRegistry != null) {
            kvStateRegistry.registerListener(jobId, new RpcKvStateRegistryListener(jobMasterGateway, kvStateServer.getServerAddress()));
        }
        if ((kvStateProxy = this.kvStateService.getKvStateClientProxy()) != null) {
            kvStateProxy.updateKvStateLocationOracle(jobId, jobMasterGateway);
        }
    }

    private void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.failExternally(cause);
            }
            catch (Throwable t) {
                this.log.error("Could not fail task {}.", (Object)executionAttemptID, (Object)t);
            }
        } else {
            this.log.debug("Cannot find task to fail for execution {}.", (Object)executionAttemptID);
        }
    }

    private void updateTaskExecutionState(JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
        CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
        futureAcknowledge.whenCompleteAsync((ack, throwable) -> {
            if (throwable != null) {
                this.failTask(executionAttemptID, (Throwable)throwable);
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private void unregisterTaskAndNotifyFinalState(JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task task = this.taskSlotTable.removeTask(executionAttemptID);
        if (task != null) {
            if (!task.getExecutionState().isTerminal()) {
                try {
                    task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
                }
                catch (Exception e) {
                    this.log.error("Could not properly fail task.", (Throwable)e);
                }
            }
            this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{task.getExecutionState(), task.getTaskInfo().getTaskNameWithSubtasks(), task.getExecutionId()});
            AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
            this.updateTaskExecutionState(jobMasterGateway, new TaskExecutionState(task.getJobID(), task.getExecutionId(), task.getExecutionState(), task.getFailureCause(), accumulatorSnapshot, task.getMetricGroup().getIOMetricGroup().createSnapshot()));
        } else {
            this.log.error("Cannot find task with ID {} to unregister.", (Object)executionAttemptID);
        }
    }

    private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        this.log.debug("Free slot with allocation id {} because: {}", (Object)allocationId, (Object)cause.getMessage());
        try {
            JobID jobId = this.taskSlotTable.getOwningJob(allocationId);
            int slotIndex = this.taskSlotTable.freeSlot(allocationId, cause);
            if (slotIndex != -1) {
                if (this.isConnectedToResourceManager()) {
                    ResourceManagerGateway resourceManagerGateway = this.establishedResourceManagerConnection.getResourceManagerGateway();
                    resourceManagerGateway.notifySlotAvailable(this.establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(this.getResourceID(), slotIndex), allocationId);
                }
                if (jobId != null) {
                    this.closeJobManagerConnectionIfNoAllocatedResources(jobId);
                }
            }
        }
        catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", (Object)allocationId, (Object)e);
        }
        this.localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
    }

    private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) {
        if (this.taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() && !this.partitionTracker.isTrackingPartitionsFor(jobId)) {
            FlinkException cause = new FlinkException("TaskExecutor " + this.getAddress() + " has no more allocated slots for job " + jobId + '.');
            this.releaseJobResources(jobId, (Exception)((Object)cause));
        }
    }

    private void timeoutSlot(AllocationID allocationId, UUID ticket) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        Preconditions.checkNotNull((Object)ticket);
        if (this.taskSlotTable.isValidTimeout(allocationId, ticket)) {
            this.freeSlotInternal(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", (Object)allocationId, (Object)ticket);
        }
    }

    private void syncSlotsWithSnapshotFromJobMaster(JobMasterGateway jobMasterGateway, AllocatedSlotReport allocatedSlotReport) {
        this.failNoLongerAllocatedSlots(allocatedSlotReport, jobMasterGateway);
        this.freeNoLongerUsedSlots(allocatedSlotReport);
    }

    private void failNoLongerAllocatedSlots(AllocatedSlotReport allocatedSlotReport, JobMasterGateway jobMasterGateway) {
        for (AllocatedSlotInfo allocatedSlotInfo : allocatedSlotReport.getAllocatedSlotInfos()) {
            AllocationID allocationId = allocatedSlotInfo.getAllocationId();
            if (this.taskSlotTable.isAllocated(allocatedSlotInfo.getSlotIndex(), allocatedSlotReport.getJobId(), allocationId)) continue;
            jobMasterGateway.failSlot(this.getResourceID(), allocationId, (Exception)((Object)new FlinkException(String.format("Slot %s on TaskExecutor %s is not allocated by job %s.", allocatedSlotInfo.getSlotIndex(), this.getResourceID(), allocatedSlotReport.getJobId()))));
        }
    }

    private void freeNoLongerUsedSlots(AllocatedSlotReport allocatedSlotReport) {
        Set<AllocationID> activeSlots = this.taskSlotTable.getActiveTaskAllocationIdsPerJob(allocatedSlotReport.getJobId());
        Set reportedSlots = allocatedSlotReport.getAllocatedSlotInfos().stream().map(AllocatedSlotInfo::getAllocationId).collect(Collectors.toSet());
        Sets.SetView difference = Sets.difference(activeSlots, reportedSlots);
        for (AllocationID allocationID : difference) {
            this.freeSlotInternal(allocationID, new FlinkException(String.format("%s is no longer allocated by job %s.", new Object[]{allocationID, allocatedSlotReport.getJobId()})));
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.establishedResourceManagerConnection != null;
    }

    private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
        return this.establishedResourceManagerConnection != null && this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals((Object)resourceManagerId);
    }

    private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) {
        return this.jobTable.getConnection(jobId).map(jmConnection -> Objects.equals((Object)jmConnection.getJobMasterId(), (Object)jobMasterId)).orElse(false);
    }

    private CompletableFuture<TransientBlobKey> requestFileUploadByFilePath(String filePath, String fileTag) {
        this.log.debug("Received file upload request for file {}", (Object)fileTag);
        if (!StringUtils.isNullOrWhitespaceOnly((String)filePath)) {
            return CompletableFuture.supplyAsync(() -> {
                File file = new File(filePath);
                if (file.exists()) {
                    try {
                        return this.putTransientBlobStream(new FileInputStream(file), fileTag).get();
                    }
                    catch (Exception e) {
                        this.log.debug("Could not upload file {}.", (Object)fileTag, (Object)e);
                        throw new CompletionException(new FlinkException("Could not upload file " + fileTag + '.', (Throwable)e));
                    }
                }
                this.log.debug("The file {} does not exist on the TaskExecutor {}.", (Object)fileTag, (Object)this.getResourceID());
                throw new CompletionException(new FlinkException("The file " + fileTag + " does not exist on the TaskExecutor."));
            }, this.ioExecutor);
        }
        this.log.debug("The file {} is unavailable on the TaskExecutor {}.", (Object)fileTag, (Object)this.getResourceID());
        return FutureUtils.completedExceptionally(new FlinkException("The file " + fileTag + " is not available on the TaskExecutor."));
    }

    private CompletableFuture<TransientBlobKey> putTransientBlobStream(InputStream inputStream, String fileTag) {
        TransientBlobKey transientBlobKey;
        TransientBlobCache transientBlobService = this.blobCacheService.getTransientBlobService();
        try {
            transientBlobKey = transientBlobService.putTransient(inputStream);
        }
        catch (IOException e) {
            this.log.debug("Could not upload file {}.", (Object)fileTag, (Object)e);
            return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileTag + '.', (Throwable)e));
        }
        return CompletableFuture.completedFuture(transientBlobKey);
    }

    public ResourceID getResourceID() {
        return this.unresolvedTaskManagerLocation.getResourceID();
    }

    void onFatalError(Throwable t) {
        try {
            this.log.error("Fatal error occurred in TaskExecutor {}.", (Object)this.getAddress(), (Object)t);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.fatalErrorHandler.onFatalError(t);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    @VisibleForTesting
    HeartbeatManager<Void, TaskExecutorHeartbeatPayload> getResourceManagerHeartbeatManager() {
        return this.resourceManagerHeartbeatManager;
    }

    private static /* synthetic */ Collection lambda$setupResultPartitionBookkeeping$9(CompletableFuture taskTerminationWithResourceCleanupFuture, JobID ignored, Collection completableFutures) {
        if (completableFutures == null) {
            completableFutures = new ArrayList<CompletableFuture>(4);
        }
        completableFutures.add(taskTerminationWithResourceCleanupFuture);
        return completableFutures;
    }

    @VisibleForTesting
    static final class TaskExecutorJobServices
    implements JobTable.JobServices {
        private final LibraryCacheManager.ClassLoaderLease classLoaderLease;
        private final Runnable closeHook;

        private TaskExecutorJobServices(LibraryCacheManager.ClassLoaderLease classLoaderLease, Runnable closeHook) {
            this.classLoaderLease = classLoaderLease;
            this.closeHook = closeHook;
        }

        @Override
        public LibraryCacheManager.ClassLoaderHandle getClassLoaderHandle() {
            return this.classLoaderLease;
        }

        @Override
        public void close() {
            this.classLoaderLease.release();
            this.closeHook.run();
        }

        @VisibleForTesting
        static TaskExecutorJobServices create(LibraryCacheManager.ClassLoaderLease classLoaderLease, Runnable closeHook) {
            return new TaskExecutorJobServices(classLoaderLease, closeHook);
        }
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceId) {
            TaskExecutor.this.validateRunsInMainThread();
            if (TaskExecutor.this.establishedResourceManagerConnection != null && TaskExecutor.this.establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
                TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                TaskExecutor.this.reconnectToResourceManager(new TaskManagerException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
            } else {
                TaskExecutor.this.log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", (Object)resourceId);
            }
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public TaskExecutorHeartbeatPayload retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            return new TaskExecutorHeartbeatPayload(TaskExecutor.this.taskSlotTable.createSlotReport(TaskExecutor.this.getResourceID()), TaskExecutor.this.partitionTracker.createClusterPartitionReport());
        }
    }

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<AllocatedSlotReport, AccumulatorReport> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
            TaskExecutor.this.jobTable.getConnection(resourceID).ifPresent(jobManagerConnection -> TaskExecutor.this.disconnectAndTryReconnectToJobManager(jobManagerConnection, new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out.")));
        }

        @Override
        public void reportPayload(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
            TaskExecutor.this.validateRunsInMainThread();
            OptionalConsumer.of(TaskExecutor.this.jobTable.getConnection(allocatedSlotReport.getJobId())).ifPresent(jobManagerConnection -> TaskExecutor.this.syncSlotsWithSnapshotFromJobMaster(jobManagerConnection.getJobManagerGateway(), allocatedSlotReport)).ifNotPresent(() -> TaskExecutor.this.log.debug("Ignoring allocated slot report from job {} because there is no active leader.", (Object)allocatedSlotReport.getJobId()));
        }

        @Override
        public AccumulatorReport retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            return TaskExecutor.this.jobTable.getConnection(resourceID).map(jobManagerConnection -> {
                JobID jobId = jobManagerConnection.getJobId();
                ArrayList<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<AccumulatorSnapshot>(16);
                Iterator allTasks = TaskExecutor.this.taskSlotTable.getTasks(jobId);
                while (allTasks.hasNext()) {
                    Task task = (Task)allTasks.next();
                    accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
                }
                return new AccumulatorReport(accumulatorSnapshots);
            }).orElseGet(() -> new AccumulatorReport(Collections.emptyList()));
        }
    }

    private class SlotActionsImpl
    implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override
        public void freeSlot(AllocationID allocationId) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId, new FlinkException("TaskSlotTable requested freeing the TaskSlot " + (Object)((Object)allocationId) + '.')));
        }

        @Override
        public void timeoutSlot(AllocationID allocationId, UUID ticket) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
        }
    }

    private final class TaskManagerActionsImpl
    implements TaskManagerActions {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            try {
                TaskExecutor.this.log.error(message, cause);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            TaskExecutor.this.fatalErrorHandler.onFatalError(cause);
        }

        @Override
        public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                TaskExecutor.this.runAsync(() -> TaskExecutor.this.unregisterTaskAndNotifyFinalState(this.jobMasterGateway, taskExecutionState.getID()));
            } else {
                TaskExecutor.this.updateTaskExecutionState(this.jobMasterGateway, taskExecutionState);
            }
        }
    }

    private final class ResourceManagerRegistrationListener
    implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess, TaskExecutorRegistrationRejection> {
        private ResourceManagerRegistrationListener() {
        }

        @Override
        public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
            ResourceID resourceManagerId = success.getResourceManagerId();
            InstanceID taskExecutorRegistrationId = success.getRegistrationId();
            ClusterInformation clusterInformation = success.getClusterInformation();
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)connection.getTargetGateway();
            TaskExecutor.this.runAsync(() -> {
                if (TaskExecutor.this.resourceManagerConnection == connection) {
                    try {
                        TaskExecutor.this.establishResourceManagerConnection(resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, clusterInformation);
                    }
                    catch (Throwable t) {
                        TaskExecutor.this.log.error("Establishing Resource Manager connection in Task Executor failed", t);
                    }
                }
            });
        }

        @Override
        public void onRegistrationFailure(Throwable failure) {
            TaskExecutor.this.onFatalError(failure);
        }

        @Override
        public void onRegistrationRejection(String targetAddress, TaskExecutorRegistrationRejection rejection) {
            TaskExecutor.this.onFatalError(new FlinkException(String.format("The TaskExecutor's registration at the ResourceManager %s has been rejected: %s", targetAddress, rejection)));
        }
    }

    private final class JobLeaderListenerImpl
    implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override
        public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.jobTable.getJob(jobId).ifPresent(job -> TaskExecutor.this.establishJobManagerConnection(job, jobManagerGateway, registrationMessage)));
        }

        @Override
        public void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", (Object)jobId, (Object)jobMasterId);
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.jobTable.getConnection(jobId).ifPresent(jobManagerConnection -> TaskExecutor.this.disconnectJobManagerConnection(jobManagerConnection, new Exception("Job leader for job id " + jobId + " lost leadership."))));
        }

        @Override
        public void handleError(Throwable throwable) {
            TaskExecutor.this.onFatalError(throwable);
        }

        @Override
        public void jobManagerRejectedRegistration(JobID jobId, String targetAddress, JMTMRegistrationRejection rejection) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.handleRejectedJobManagerConnection(jobId, targetAddress, rejection));
        }
    }

    private final class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.notifyOfNewResourceManagerLeader(leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            TaskExecutor.this.onFatalError(exception);
        }
    }
}

