/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.ExecutionGraphException;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class JobMaster
extends RpcEndpoint<JobMasterGateway> {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";
    private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER = AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
    private final ResourceID resourceId;
    private final JobGraph jobGraph;
    private final Configuration configuration;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobLibraryCacheManager libraryCacheManager;
    private final MetricGroup jobManagerMetricGroup;
    private final MetricGroup jobMetricGroup;
    private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final Executor executor;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler errorHandler;
    private final ClassLoader userCodeLoader;
    private final ExecutionGraph executionGraph;
    private final SlotPool slotPool;
    private final SlotPoolGateway slotPoolGateway;
    private volatile UUID leaderSessionID;
    private LeaderRetrievalService resourceManagerLeaderRetriever;
    private ResourceManagerConnection resourceManagerConnection;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;

    public JobMaster(RpcService rpcService, ResourceID resourceId, JobGraph jobGraph, Configuration configuration, HighAvailabilityServices highAvailabilityService, HeartbeatServices heartbeatServices, ScheduledExecutorService executor, BlobLibraryCacheManager libraryCacheManager, RestartStrategyFactory restartStrategyFactory, Time rpcAskTimeout, @Nullable JobManagerMetricGroup jobManagerMetricGroup, OnCompletionActions jobCompletionActions, FatalErrorHandler errorHandler, ClassLoader userCodeLoader) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.rpcTimeout = rpcAskTimeout;
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityService);
        this.libraryCacheManager = (BlobLibraryCacheManager)Preconditions.checkNotNull((Object)libraryCacheManager);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.jobCompletionActions = (OnCompletionActions)Preconditions.checkNotNull((Object)jobCompletionActions);
        this.errorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)errorHandler);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        String jobName = jobGraph.getName();
        JobID jid = jobGraph.getJobID();
        if (jobManagerMetricGroup != null) {
            this.jobManagerMetricGroup = jobManagerMetricGroup;
            this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
        } else {
            this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
            this.jobMetricGroup = new UnregisteredMetricsGroup();
        }
        this.log.info("Initializing job {} ({}).", (Object)jobName, (Object)jid);
        RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = ((ExecutionConfig)jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader)).getRestartStrategy();
        RestartStrategy restartStrategy = restartStrategyConfiguration != null ? RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : restartStrategyFactory.createRestartStrategy();
        this.log.info("Using restart strategy {} for {} ({}).", new Object[]{restartStrategy, jobName, jid});
        CheckpointRecoveryFactory checkpointRecoveryFactory = this.highAvailabilityServices.getCheckpointRecoveryFactory();
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
        this.slotPoolGateway = (SlotPoolGateway)this.slotPool.getSelf();
        this.executionGraph = ExecutionGraphBuilder.buildGraph(null, jobGraph, configuration, executor, executor, this.slotPool.getSlotProvider(), userCodeLoader, checkpointRecoveryFactory, rpcAskTimeout, restartStrategy, this.jobMetricGroup, -1, this.log);
        this.executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
        this.registeredTaskManagers = new HashMap<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>(4);
    }

    @Override
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public void start(UUID leaderSessionID) throws Exception {
        if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
            super.start();
            this.log.info("JobManager started as leader {} for job {}", (Object)leaderSessionID, (Object)this.jobGraph.getJobID());
            ((JobMasterGateway)this.getSelf()).startJobExecution();
        } else {
            this.log.warn("Job already started with leader ID {}, ignoring this start request.", (Object)leaderSessionID);
        }
    }

    @Override
    public void shutDown() throws Exception {
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        ((JobMasterGateway)this.getSelf()).suspendExecution(new Exception("JobManager is shutting down."));
        super.shutDown();
    }

    @RpcMethod
    public void startJobExecution() {
        if (this.leaderSessionID == null) {
            this.log.info("Aborting job startup - JobManager lost leader status");
            return;
        }
        this.log.info("Starting execution of job {} ({})", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
        try {
            this.log.debug("Staring SlotPool component");
            this.slotPool.start(this.leaderSessionID, this.getAddress());
        }
        catch (Exception e) {
            this.log.error("Faild to start job {} ({})", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), e});
            this.handleFatalError(new Exception("Could not start job execution: Failed to start the slot pool.", e));
        }
        try {
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
        }
        catch (Throwable t) {
            this.log.error("Failed to start job {} ({})", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), t});
            this.handleFatalError(new Exception("Could not start job execution: Failed to start leader service for Resource Manager", t));
            return;
        }
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    JobMaster.this.executionGraph.scheduleForExecution();
                }
                catch (Throwable t) {
                    JobMaster.this.executionGraph.failGlobal(t);
                }
            }
        });
    }

    @RpcMethod
    public void suspendExecution(Throwable cause) {
        if (this.leaderSessionID == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return;
        }
        this.leaderSessionID = null;
        try {
            this.resourceManagerLeaderRetriever.stop();
        }
        catch (Throwable t) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", t);
        }
        this.executionGraph.suspend(cause);
        ((StartStoppable)this.getSelf()).stop();
        this.slotPoolGateway.suspend();
        this.closeResourceManagerConnection(new Exception("Execution was suspended.", cause));
    }

    @RpcMethod
    public Acknowledge updateTaskExecutionState(UUID leaderSessionID, TaskExecutionState taskExecutionState) throws Exception {
        Preconditions.checkNotNull((Object)taskExecutionState, (String)"taskExecutionState");
        this.validateLeaderSessionId(leaderSessionID);
        if (this.executionGraph.updateState(taskExecutionState)) {
            return Acknowledge.get();
        }
        throw new ExecutionGraphException("The execution attempt " + (Object)((Object)taskExecutionState.getID()) + " was not found.");
    }

    @RpcMethod
    public SerializedInputSplit requestNextInputSplit(UUID leaderSessionID, JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws Exception {
        this.validateLeaderSessionId(leaderSessionID);
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)executionAttempt);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", (Object)executionAttempt);
            }
            throw new Exception("Can not find Execution for attempt " + (Object)((Object)executionAttempt));
        }
        ExecutionJobVertex vertex = this.executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            this.log.error("Cannot find execution vertex for vertex ID {}.", (Object)vertexID);
            throw new Exception("Cannot find execution vertex for vertex ID " + (Object)((Object)vertexID));
        }
        InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
        if (splitAssigner == null) {
            this.log.error("No InputSplitAssigner for vertex ID {}.", (Object)vertexID);
            throw new Exception("No InputSplitAssigner for vertex ID " + (Object)((Object)vertexID));
        }
        SimpleSlot slot = execution.getAssignedResource();
        int taskId = execution.getVertex().getParallelSubtaskIndex();
        String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Send next input split {}.", (Object)nextInputSplit);
        }
        try {
            byte[] serializedInputSplit = InstantiationUtil.serializeObject((Object)nextInputSplit);
            return new SerializedInputSplit(serializedInputSplit);
        }
        catch (Exception ex) {
            this.log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), (Object)ex);
            IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            throw reason;
        }
    }

    @RpcMethod
    public ExecutionState requestPartitionState(UUID leaderSessionID, IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws Exception {
        this.validateLeaderSessionId(leaderSessionID);
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)resultPartitionId.getProducerId());
        if (execution != null) {
            return execution.getState();
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get((Object)intermediateResultId);
        if (intermediateResult != null) {
            Execution producerExecution = intermediateResult.getPartitionById(resultPartitionId.getPartitionId()).getProducer().getCurrentExecutionAttempt();
            if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
                return producerExecution.getState();
            }
            throw new PartitionProducerDisposedException(resultPartitionId);
        }
        throw new IllegalArgumentException("Intermediate data set with ID " + (Object)((Object)intermediateResultId) + " not found.");
    }

    @RpcMethod
    public Acknowledge scheduleOrUpdateConsumers(UUID leaderSessionID, ResultPartitionID partitionID) throws Exception {
        this.validateLeaderSessionId(leaderSessionID);
        this.executionGraph.scheduleOrUpdateConsumers(partitionID);
        return Acknowledge.get();
    }

    @RpcMethod
    public void disconnectTaskManager(ResourceID resourceID, Exception cause) {
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        this.slotPoolGateway.releaseTaskManager(resourceID);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = this.registeredTaskManagers.remove(resourceID);
        if (taskManagerConnection != null) {
            ((TaskExecutorGateway)taskManagerConnection.f1).disconnectJobManager(this.jobGraph.getJobID(), cause);
        }
    }

    @RpcMethod
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState checkpointState) throws CheckpointException {
        final CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                    }
                    catch (Throwable t) {
                        JobMaster.this.log.warn("Error while processing checkpoint acknowledgement message");
                    }
                }
            });
        } else {
            this.log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @RpcMethod
    public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointID, Throwable reason) {
        final DeclineCheckpoint decline = new DeclineCheckpoint(jobID, executionAttemptID, checkpointID, reason);
        final CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        checkpointCoordinator.receiveDeclineMessage(decline);
                    }
                    catch (Exception e) {
                        JobMaster.this.log.error("Error in CheckpointCoordinator while processing {}", (Object)decline, (Object)e);
                    }
                }
            });
        } else {
            this.log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @RpcMethod
    public KvStateLocation lookupKvStateLocation(String registrationName) throws Exception {
        KvStateLocationRegistry registry;
        KvStateLocation location;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Lookup key-value state for job {} with registration name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        if ((location = (registry = this.executionGraph.getKvStateLocationRegistry()).getKvStateLocation(registrationName)) != null) {
            return location;
        }
        throw new UnknownKvStateLocation(registrationName);
    }

    @RpcMethod
    public void notifyKvStateRegistered(JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, KvStateServerAddress kvStateServerAddress) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state registered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
        }
        catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName);
        }
    }

    @RpcMethod
    public void notifyKvStateUnregistered(JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state unregistered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);
        }
        catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName);
        }
    }

    @RpcMethod
    public ClassloadingProps requestClassloadingProps() throws Exception {
        return new ClassloadingProps(this.libraryCacheManager.getBlobServerPort(), this.executionGraph.getRequiredJarFiles(), this.executionGraph.getRequiredClasspaths());
    }

    @RpcMethod
    public Future<Iterable<SlotOffer>> offerSlots(ResourceID taskManagerId, Iterable<SlotOffer> slots, UUID leaderId) throws Exception {
        this.validateLeaderSessionId(leaderId);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = this.registeredTaskManagers.get(taskManagerId);
        if (taskManager == null) {
            throw new Exception("Unknown TaskManager " + taskManagerId);
        }
        JobID jid = this.jobGraph.getJobID();
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)taskManager.f0;
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.f1;
        ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<Tuple2<AllocatedSlot, SlotOffer>>();
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, leaderId);
        for (SlotOffer slotOffer : slots) {
            AllocatedSlot slot = new AllocatedSlot(slotOffer.getAllocationId(), jid, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), rpcTaskManagerGateway);
            slotsAndOffers.add((Tuple2<AllocatedSlot, SlotOffer>)new Tuple2((Object)slot, (Object)slotOffer));
        }
        return this.slotPoolGateway.offerSlots(slotsAndOffers);
    }

    @RpcMethod
    public void failSlot(ResourceID taskManagerId, AllocationID allocationId, UUID leaderId, Exception cause) throws Exception {
        this.validateLeaderSessionId(this.leaderSessionID);
        if (!this.registeredTaskManagers.containsKey(taskManagerId)) {
            throw new Exception("Unknown TaskManager " + taskManagerId);
        }
        this.slotPoolGateway.failAllocation(allocationId, cause);
    }

    @RpcMethod
    public Future<RegistrationResponse> registerTaskManager(final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final UUID leaderId) throws Exception {
        if (!this.leaderSessionID.equals(leaderId)) {
            this.log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}.", new Object[]{taskManagerLocation.getResourceID(), taskManagerRpcAddress, this.leaderSessionID, leaderId});
            throw new Exception("Leader id not match, expected: " + this.leaderSessionID + ", actual: " + leaderId);
        }
        final ResourceID taskManagerId = taskManagerLocation.getResourceID();
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            JMTMRegistrationSuccess response = new JMTMRegistrationSuccess(this.resourceId, this.libraryCacheManager.getBlobServerPort());
            return FlinkCompletableFuture.completed(response);
        }
        return this.getRpcService().execute(new Callable<TaskExecutorGateway>(){

            @Override
            public TaskExecutorGateway call() throws Exception {
                return JobMaster.this.getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class).get(JobMaster.this.rpcTimeout.getSize(), JobMaster.this.rpcTimeout.getUnit());
            }
        }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>(){

            @Override
            public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
                if (throwable != null) {
                    return new RegistrationResponse.Decline(throwable.getMessage());
                }
                if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
                    JobMaster.this.log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}.", new Object[]{taskManagerId, taskManagerRpcAddress, JobMaster.this.leaderSessionID, leaderId});
                    return new RegistrationResponse.Decline("Invalid leader session id");
                }
                JobMaster.this.slotPoolGateway.registerTaskManager(taskManagerId);
                JobMaster.this.registeredTaskManagers.put(taskManagerId, Tuple2.of((Object)taskManagerLocation, (Object)taskExecutorGateway));
                JobMaster.this.taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>(){

                    @Override
                    public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    }

                    @Override
                    public void requestHeartbeat(ResourceID resourceID, Void payload) {
                        taskExecutorGateway.heartbeatFromJobManager(resourceID);
                    }
                });
                return new JMTMRegistrationSuccess(JobMaster.this.resourceId, JobMaster.this.libraryCacheManager.getBlobServerPort());
            }
        }, this.getMainThreadExecutor());
    }

    @RpcMethod
    public void disconnectResourceManager(UUID jobManagerLeaderId, UUID resourceManagerLeaderId, Exception cause) throws Exception {
        this.validateLeaderSessionId(jobManagerLeaderId);
        if (this.resourceManagerConnection != null && this.resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
            this.closeResourceManagerConnection(cause);
        }
    }

    @RpcMethod
    public void heartbeatFromTaskManager(ResourceID resourceID) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @RpcMethod
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    private void handleFatalError(final Throwable cause) {
        this.runAsync(new Runnable(){

            @Override
            public void run() {
                JobMaster.this.log.error("Fatal error occurred on JobManager, cause: {}", (Object)cause.getMessage(), (Object)cause);
                try {
                    JobMaster.this.shutDown();
                }
                catch (Exception e) {
                    cause.addSuppressed(e);
                }
                JobMaster.this.errorHandler.onFatalError(cause);
            }
        });
    }

    private void jobStatusChanged(JobStatus newJobStatus, long timestamp, Throwable error) {
        this.validateRunsInMainThread();
        JobID jobID = this.executionGraph.getJobID();
        String jobName = this.executionGraph.getJobName();
        this.log.info("Status of job {} ({}) changed to {}.", new Object[]{jobID, jobName, newJobStatus, error});
        if (newJobStatus.isGloballyTerminalState()) {
            switch (newJobStatus) {
                case FINISHED: {
                    try {
                        Map<String, Object> accumulatorResults = this.executionGraph.getAccumulators();
                        JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
                        this.jobCompletionActions.jobFinished(result);
                    }
                    catch (Exception e) {
                        this.log.error("Cannot fetch final accumulators for job {} ({})", new Object[]{jobName, jobID, e});
                        JobExecutionException exception = new JobExecutionException(jobID, "Failed to retrieve accumulator results. The job is registered as 'FINISHED (successful), but this notification describes a failure, since the resulting accumulators could not be fetched.", e);
                        this.jobCompletionActions.jobFailed(exception);
                    }
                    break;
                }
                case CANCELED: {
                    JobExecutionException exception = new JobExecutionException(jobID, "Job was cancelled.", new Exception("The job was cancelled"));
                    this.jobCompletionActions.jobFailed(exception);
                    break;
                }
                case FAILED: {
                    Throwable unpackedError = SerializedThrowable.get(error, this.userCodeLoader);
                    JobExecutionException exception = new JobExecutionException(jobID, "Job execution failed.", unpackedError);
                    this.jobCompletionActions.jobFailed(exception);
                    break;
                }
                default: {
                    throw new IllegalStateException(newJobStatus.toString());
                }
            }
        }
    }

    private void notifyOfNewResourceManagerLeader(String resourceManagerAddress, UUID resourceManagerLeaderId) {
        if (this.resourceManagerConnection != null) {
            if (resourceManagerAddress != null) {
                if (resourceManagerAddress.equals(this.resourceManagerConnection.getTargetAddress()) && resourceManagerLeaderId.equals(this.resourceManagerConnection.getTargetLeaderId())) {
                    return;
                }
                this.closeResourceManagerConnection(new Exception("ResourceManager leader changed to new address " + resourceManagerAddress));
                this.log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)resourceManagerAddress);
            } else {
                this.log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", (Object)this.resourceManagerConnection.getTargetAddress());
            }
        }
        if (resourceManagerAddress != null) {
            this.log.info("Attempting to register at ResourceManager {}", (Object)resourceManagerAddress);
            this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, this.getAddress(), this.leaderSessionID, resourceManagerAddress, resourceManagerLeaderId, this.executor);
            this.resourceManagerConnection.start();
        }
    }

    private void establishResourceManagerConnection(JobMasterRegistrationSuccess success) {
        UUID resourceManagerLeaderId = success.getResourceManagerLeaderId();
        if (this.resourceManagerConnection != null && this.resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
            this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", (Object)resourceManagerLeaderId);
            final ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            this.slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, resourceManagerGateway);
            this.resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                }
            });
        }
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.resourceManagerConnection != null) {
            this.log.info("Close ResourceManager connection {}.", (Object)this.resourceManagerConnection.getResourceManagerResourceID(), (Object)cause);
            this.resourceManagerHeartbeatManager.unmonitorTarget(this.resourceManagerConnection.getResourceManagerResourceID());
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            resourceManagerGateway.disconnectJobManager(this.resourceManagerConnection.getJobID(), cause);
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
        this.slotPoolGateway.disconnectResourceManager();
    }

    private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
        if (this.leaderSessionID == null || !this.leaderSessionID.equals(leaderSessionID)) {
            throw new LeaderIdMismatchException(this.leaderSessionID, leaderSessionID);
        }
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                    JobMaster.this.closeResourceManagerConnection(new TimeoutException("The heartbeat of ResourceManager with id " + resourceId + " timed out."));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private TaskManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.log.info("Heartbeat of TaskManager with id {} timed out.", (Object)resourceID);
            ((JobMasterGateway)JobMaster.this.getSelf()).disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class JobManagerJobStatusListener
    implements JobStatusListener {
        private JobManagerJobStatusListener() {
        }

        @Override
        public void jobStatusChanges(JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobMaster.this.jobStatusChanged(newJobStatus, timestamp, error);
                }
            });
        }
    }

    private class ResourceManagerConnection
    extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final UUID jobManagerLeaderID;
        private ResourceID resourceManagerResourceID;

        ResourceManagerConnection(Logger log, JobID jobID, ResourceID jobManagerResourceID, String jobManagerRpcAddress, UUID jobManagerLeaderID, String resourceManagerAddress, UUID resourceManagerLeaderID, Executor executor) {
            super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
            this.jobID = (JobID)Preconditions.checkNotNull((Object)jobID);
            this.jobManagerResourceID = (ResourceID)Preconditions.checkNotNull((Object)jobManagerResourceID);
            this.jobManagerRpcAddress = (String)Preconditions.checkNotNull((Object)jobManagerRpcAddress);
            this.jobManagerLeaderID = (UUID)Preconditions.checkNotNull((Object)jobManagerLeaderID);
        }

        @Override
        protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, this.getTargetAddress(), this.getTargetLeaderId()){

                @Override
                protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
                    Time timeout = Time.milliseconds((long)timeoutMillis);
                    return gateway.registerJobManager(leaderId, ResourceManagerConnection.this.jobManagerLeaderID, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, timeout);
                }
            };
        }

        @Override
        protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManagerConnection.this.resourceManagerResourceID = success.getResourceManagerResourceId();
                    JobMaster.this.establishResourceManagerConnection(success);
                }
            });
        }

        @Override
        protected void onRegistrationFailure(Throwable failure) {
            JobMaster.this.handleFatalError(failure);
        }

        public ResourceID getResourceManagerResourceID() {
            return this.resourceManagerResourceID;
        }

        public JobID getJobID() {
            return this.jobID;
        }
    }

    private class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
            JobMaster.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobMaster.this.notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
                }
            });
        }

        @Override
        public void handleError(Exception exception) {
            JobMaster.this.handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception));
        }
    }
}

