/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.InfoMessageListenerRpcGateway;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdActions;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public abstract class ResourceManager<WorkerType extends Serializable>
extends RpcEndpoint<ResourceManagerGateway>
implements LeaderContender {
    public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    private final ResourceID resourceId;
    private final ResourceManagerConfiguration resourceManagerConfiguration;
    private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
    private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
    private final JobLeaderIdService jobLeaderIdService;
    private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
    private final HighAvailabilityServices highAvailabilityServices;
    private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final MetricRegistry metricRegistry;
    private final FatalErrorHandler fatalErrorHandler;
    private final SlotManager slotManager;
    private LeaderElectionService leaderElectionService;
    private volatile UUID leaderSessionId;
    private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;

    public ResourceManager(RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, resourceManagerEndpointId);
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.resourceManagerConfiguration = (ResourceManagerConfiguration)Preconditions.checkNotNull((Object)resourceManagerConfiguration);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.slotManager = (SlotManager)Preconditions.checkNotNull((Object)slotManager);
        this.metricRegistry = (MetricRegistry)Preconditions.checkNotNull((Object)metricRegistry);
        this.jobLeaderIdService = (JobLeaderIdService)Preconditions.checkNotNull((Object)jobLeaderIdService);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerRegistrations = new HashMap<JobID, JobManagerRegistration>(4);
        this.jmResourceIdRegistrations = new HashMap<ResourceID, JobManagerRegistration>(4);
        this.taskExecutors = new HashMap<ResourceID, WorkerRegistration<WorkerType>>(8);
        this.leaderSessionId = null;
        this.infoMessageListeners = new ConcurrentHashMap<String, InfoMessageListenerRpcGateway>(8);
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.leaderElectionService = this.highAvailabilityServices.getResourceManagerLeaderElectionService();
        try {
            this.leaderElectionService.start(this);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start the leader election service.", e);
        }
        try {
            this.jobLeaderIdService.start(new JobLeaderIdActionsImpl());
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start the job leader id service.", e);
        }
        this.initialize();
    }

    @Override
    public void shutDown() throws Exception {
        Exception exception = null;
        this.taskManagerHeartbeatManager.stop();
        this.jobManagerHeartbeatManager.stop();
        try {
            this.slotManager.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        try {
            this.leaderElectionService.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            super.shutDown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        this.clearState();
        if (exception != null) {
            ExceptionUtils.rethrowException((Throwable)exception, (String)"Error while shutting the ResourceManager down.");
        }
    }

    @RpcMethod
    public Future<RegistrationResponse> registerJobManager(final UUID resourceManagerLeaderId, final UUID jobManagerLeaderId, final ResourceID jobManagerResourceId, final String jobManagerAddress, final JobID jobId) {
        Preconditions.checkNotNull((Object)resourceManagerLeaderId);
        Preconditions.checkNotNull((Object)jobManagerLeaderId);
        Preconditions.checkNotNull((Object)jobManagerResourceId);
        Preconditions.checkNotNull((Object)jobManagerAddress);
        Preconditions.checkNotNull((Object)jobId);
        if (this.isValid(resourceManagerLeaderId)) {
            Future<UUID> jobLeaderIdFuture;
            if (!this.jobLeaderIdService.containsJob(jobId)) {
                try {
                    this.jobLeaderIdService.addJob(jobId);
                }
                catch (Exception e) {
                    ResourceManagerException exception = new ResourceManagerException("Could not add the job " + jobId + " to the job id leader service.", e);
                    this.onFatalErrorAsync(exception);
                    this.log.error("Could not add job {} to job leader id service.", (Object)jobId, (Object)e);
                    return FlinkCompletableFuture.completedExceptionally(exception);
                }
            }
            this.log.info("Registering job manager {}@{} for job {}.", new Object[]{jobManagerLeaderId, jobManagerAddress, jobId});
            try {
                jobLeaderIdFuture = this.jobLeaderIdService.getLeaderId(jobId);
            }
            catch (Exception e) {
                ResourceManagerException exception = new ResourceManagerException("Cannot obtain the job leader id future to verify the correct job leader.", e);
                this.onFatalErrorAsync(exception);
                this.log.debug("Could not obtain the job leader id future to verify the correct job leader.");
                return FlinkCompletableFuture.completedExceptionally(exception);
            }
            Future<JobMasterGateway> jobMasterGatewayFuture = this.getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
            Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture, new BiFunction<JobMasterGateway, UUID, RegistrationResponse>(){

                @Override
                public RegistrationResponse apply(final JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
                    if (ResourceManager.this.isValid(resourceManagerLeaderId)) {
                        if (jobLeaderId.equals(jobManagerLeaderId)) {
                            if (ResourceManager.this.jobManagerRegistrations.containsKey(jobId)) {
                                JobManagerRegistration oldJobManagerRegistration = (JobManagerRegistration)ResourceManager.this.jobManagerRegistrations.get(jobId);
                                if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
                                    ResourceManager.this.log.debug("Job manager {}@{} was already registered.", (Object)jobManagerLeaderId, (Object)jobManagerAddress);
                                } else {
                                    ResourceManager.this.disconnectJobManager(oldJobManagerRegistration.getJobID(), new Exception("New job leader for job " + jobId + " found."));
                                    JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobLeaderId, jobMasterGateway);
                                    ResourceManager.this.jobManagerRegistrations.put(jobId, jobManagerRegistration);
                                    ResourceManager.this.jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
                                }
                            } else {
                                JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobLeaderId, jobMasterGateway);
                                ResourceManager.this.jobManagerRegistrations.put(jobId, jobManagerRegistration);
                                ResourceManager.this.jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
                            }
                            ResourceManager.this.log.info("Registered job manager {}@{} for job {}.", new Object[]{jobManagerLeaderId, jobManagerAddress, jobId});
                            ResourceManager.this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>(){

                                @Override
                                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                                }

                                @Override
                                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                                    jobMasterGateway.heartbeatFromResourceManager(resourceID);
                                }
                            });
                            return new JobMasterRegistrationSuccess(ResourceManager.this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), ResourceManager.this.getLeaderSessionId(), ResourceManager.this.resourceId);
                        }
                        ResourceManager.this.log.debug("The job manager leader id {} did not match the job leader id {}.", (Object)jobManagerLeaderId, (Object)jobLeaderId);
                        return new RegistrationResponse.Decline("Job manager leader id did not match.");
                    }
                    ResourceManager.this.log.debug("The resource manager leader id changed {}. Discarding job manager registration from {}.", (Object)ResourceManager.this.getLeaderSessionId(), (Object)jobManagerAddress);
                    return new RegistrationResponse.Decline("Resource manager leader id changed.");
                }
            }, this.getMainThreadExecutor());
            return registrationResponseFuture.handleAsync(new BiFunction<RegistrationResponse, Throwable, RegistrationResponse>(){

                @Override
                public RegistrationResponse apply(RegistrationResponse registrationResponse, Throwable throwable) {
                    if (throwable != null) {
                        if (ResourceManager.this.log.isDebugEnabled()) {
                            ResourceManager.this.log.debug("Registration of job manager {}@{} failed.", new Object[]{jobManagerLeaderId, jobManagerAddress, throwable});
                        } else {
                            ResourceManager.this.log.info("Registration of job manager {}@{} failed.", (Object)jobManagerLeaderId, (Object)jobManagerAddress);
                        }
                        return new RegistrationResponse.Decline(throwable.getMessage());
                    }
                    return registrationResponse;
                }
            }, this.getRpcService().getExecutor());
        }
        this.log.debug("Discard register job manager message from {}, because the leader id {} did not match the expected leader id {}.", new Object[]{jobManagerAddress, resourceManagerLeaderId, this.leaderSessionId});
        return FlinkCompletableFuture.completed(new RegistrationResponse.Decline("Resource manager leader id did not match."));
    }

    @RpcMethod
    public Future<RegistrationResponse> registerTaskExecutor(UUID resourceManagerLeaderId, String taskExecutorAddress, final ResourceID taskExecutorResourceId, final SlotReport slotReport) {
        if (Objects.equals(this.leaderSessionId, resourceManagerLeaderId)) {
            Future<TaskExecutorGateway> taskExecutorGatewayFuture = this.getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
            return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>(){

                @Override
                public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
                    if (throwable != null) {
                        return new RegistrationResponse.Decline(throwable.getMessage());
                    }
                    WorkerRegistration oldRegistration = (WorkerRegistration)ResourceManager.this.taskExecutors.remove(taskExecutorResourceId);
                    if (oldRegistration != null) {
                        ResourceManager.this.log.info("Replacing old instance of worker for ResourceID {}", (Object)taskExecutorResourceId);
                        ResourceManager.this.slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
                    }
                    Object newWorker = ResourceManager.this.workerStarted(taskExecutorResourceId);
                    WorkerRegistration registration = new WorkerRegistration(taskExecutorGateway, newWorker);
                    ResourceManager.this.taskExecutors.put(taskExecutorResourceId, registration);
                    ResourceManager.this.slotManager.registerTaskManager(registration, slotReport);
                    ResourceManager.this.taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>(){

                        @Override
                        public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                        }

                        @Override
                        public void requestHeartbeat(ResourceID resourceID, Void payload) {
                            taskExecutorGateway.heartbeatFromResourceManager(resourceID);
                        }
                    });
                    return new TaskExecutorRegistrationSuccess(registration.getInstanceID(), ResourceManager.this.resourceId, ResourceManager.this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
                }
            }, this.getMainThreadExecutor());
        }
        this.log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}", new Object[]{taskExecutorResourceId, taskExecutorAddress, this.leaderSessionId, resourceManagerLeaderId});
        return FlinkCompletableFuture.completed(new RegistrationResponse.Decline("Discard registration because the leader id " + resourceManagerLeaderId + " does not match the expected leader id " + this.leaderSessionId + '.'));
    }

    @RpcMethod
    public void heartbeatFromTaskManager(ResourceID resourceID) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @RpcMethod
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @RpcMethod
    public void disconnectTaskManager(ResourceID resourceId, Exception cause) {
        this.closeTaskManagerConnection(resourceId, cause);
    }

    @RpcMethod
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.closeJobManagerConnection(jobId, cause);
    }

    @RpcMethod
    public Acknowledge requestSlot(UUID jobMasterLeaderID, UUID resourceManagerLeaderID, SlotRequest slotRequest) throws ResourceManagerException, LeaderSessionIDException {
        if (!Objects.equals(resourceManagerLeaderID, this.leaderSessionId)) {
            throw new LeaderSessionIDException(resourceManagerLeaderID, this.leaderSessionId);
        }
        JobID jobId = slotRequest.getJobId();
        JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobId);
        if (null != jobManagerRegistration) {
            if (Objects.equals(jobMasterLeaderID, jobManagerRegistration.getLeaderID())) {
                this.log.info("Request slot with profile {} for job {} with allocation id {}.", new Object[]{slotRequest.getResourceProfile(), slotRequest.getJobId(), slotRequest.getAllocationId()});
                this.slotManager.registerSlotRequest(slotRequest);
                return Acknowledge.get();
            }
            throw new LeaderSessionIDException(jobMasterLeaderID, jobManagerRegistration.getLeaderID());
        }
        throw new ResourceManagerException("Could not find registered job manager for job " + jobId + '.');
    }

    @RpcMethod
    public void notifySlotAvailable(UUID resourceManagerLeaderId, InstanceID instanceID, SlotID slotId, AllocationID allocationId) {
        if (Objects.equals(resourceManagerLeaderId, this.leaderSessionId)) {
            ResourceID resourceId = slotId.getResourceID();
            WorkerRegistration<WorkerType> registration = this.taskExecutors.get(resourceId);
            if (registration != null) {
                InstanceID registrationId = registration.getInstanceID();
                if (Objects.equals((Object)registrationId, (Object)instanceID)) {
                    this.slotManager.freeSlot(slotId, allocationId);
                } else {
                    this.log.debug("Invalid registration id for slot available message. This indicates an outdated request.");
                }
            } else {
                this.log.debug("Could not find registration for resource id {}. Discarding the slot availablemessage {}.", (Object)resourceId, (Object)slotId);
            }
        } else {
            this.log.debug("Discarding notify slot available message for slot {}, because the leader id {} did not match the expected leader id {}.", new Object[]{slotId, resourceManagerLeaderId, this.leaderSessionId});
        }
    }

    @RpcMethod
    public void registerInfoMessageListener(final String address) {
        if (this.infoMessageListeners.containsKey(address)) {
            this.log.warn("Receive a duplicate registration from info message listener on ({})", (Object)address);
        } else {
            Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = this.getRpcService().connect(address, InfoMessageListenerRpcGateway.class);
            Future<Void> infoMessageListenerAcceptFuture = infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>(){

                @Override
                public void accept(InfoMessageListenerRpcGateway gateway) {
                    ResourceManager.this.log.info("Receive a registration from info message listener on ({})", (Object)address);
                    ResourceManager.this.infoMessageListeners.put(address, gateway);
                }
            }, this.getMainThreadExecutor());
            infoMessageListenerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    ResourceManager.this.log.warn("Receive a registration from unreachable info message listener on ({})", (Object)address);
                    return null;
                }
            }, this.getRpcService().getExecutor());
        }
    }

    @RpcMethod
    public void unRegisterInfoMessageListener(String address) {
        this.infoMessageListeners.remove(address);
    }

    @RpcMethod
    public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {
        this.log.info("shut down cluster because application is in {}, diagnostics {}", (Object)finalStatus, (Object)optionalDiagnostics);
        this.shutDownApplication(finalStatus, optionalDiagnostics);
    }

    @RpcMethod
    public Integer getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) throws LeaderIdMismatchException {
        if (Objects.equals(this.leaderSessionId, requestLeaderSessionId)) {
            return this.taskExecutors.size();
        }
        throw new LeaderIdMismatchException(this.leaderSessionId, requestLeaderSessionId);
    }

    @VisibleForTesting
    UUID getLeaderSessionId() {
        return this.leaderSessionId;
    }

    private void clearState() {
        this.jobManagerRegistrations.clear();
        this.jmResourceIdRegistrations.clear();
        this.taskExecutors.clear();
        try {
            this.jobLeaderIdService.clear();
        }
        catch (Exception e) {
            this.onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
        }
        this.leaderSessionId = null;
    }

    protected void closeJobManagerConnection(JobID jobId, Exception cause) {
        JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.remove(jobId);
        if (jobManagerRegistration != null) {
            ResourceID jobManagerResourceId = jobManagerRegistration.getJobManagerResourceID();
            JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
            UUID jobManagerLeaderId = jobManagerRegistration.getLeaderID();
            this.log.info("Disconnect job manager {}@{} for job {} from the resource manager.", new Object[]{jobManagerLeaderId, jobMasterGateway.getAddress(), jobId});
            this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId);
            this.jmResourceIdRegistrations.remove(jobManagerResourceId);
            jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, this.getLeaderSessionId(), cause);
        } else {
            this.log.debug("There was no registered job manager for job {}.", (Object)jobId);
        }
    }

    protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) {
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        WorkerRegistration<WorkerType> workerRegistration = this.taskExecutors.remove(resourceID);
        if (workerRegistration != null) {
            this.log.info("Task manager {} failed because {}.", (Object)resourceID, (Object)cause);
            this.slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
            workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
        } else {
            this.log.debug("Could not find a registered task manager with the process id {}.", (Object)resourceID);
        }
    }

    protected boolean isValid(UUID resourceManagerLeaderId) {
        return Objects.equals(resourceManagerLeaderId, this.leaderSessionId);
    }

    protected void removeJob(JobID jobId) {
        try {
            this.jobLeaderIdService.removeJob(jobId);
        }
        catch (Exception e) {
            this.log.warn("Could not properly remove the job {} from the job leader id service.", (Object)jobId, (Object)e);
        }
        if (this.jobManagerRegistrations.containsKey(jobId)) {
            this.disconnectJobManager(jobId, new Exception("Job " + jobId + "was removed"));
        }
    }

    protected void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId) {
        if (this.jobManagerRegistrations.containsKey(jobId)) {
            JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobId);
            if (Objects.equals(jobManagerRegistration.getLeaderID(), oldJobLeaderId)) {
                this.disconnectJobManager(jobId, new Exception("Job leader lost leadership."));
            } else {
                this.log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", (Object)jobId);
            }
        } else {
            this.log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", (Object)oldJobLeaderId, (Object)jobId);
        }
    }

    public void sendInfoMessage(final String message) {
        this.getRpcService().execute(new Runnable(){

            @Override
            public void run() {
                InfoMessage infoMessage = new InfoMessage(message);
                for (InfoMessageListenerRpcGateway listenerRpcGateway : ResourceManager.this.infoMessageListeners.values()) {
                    listenerRpcGateway.notifyInfoMessage(infoMessage);
                }
            }
        });
    }

    protected void onFatalErrorAsync(final Throwable t) {
        this.runAsync(new Runnable(){

            @Override
            public void run() {
                ResourceManager.this.onFatalError(t);
            }
        });
    }

    void onFatalError(Throwable t) {
        this.log.error("Fatal error occurred.", t);
        this.fatalErrorHandler.onFatalError(t);
    }

    @Override
    public void grantLeadership(final UUID newLeaderSessionID) {
        this.runAsync(new Runnable(){

            @Override
            public void run() {
                ResourceManager.this.log.info("ResourceManager {} was granted leadership with leader session ID {}", (Object)ResourceManager.this.getAddress(), (Object)newLeaderSessionID);
                if (ResourceManager.this.leaderSessionId != null) {
                    ResourceManager.this.clearState();
                }
                ResourceManager.this.leaderSessionId = newLeaderSessionID;
                ResourceManager.this.slotManager.start(ResourceManager.this.leaderSessionId, ResourceManager.this.getMainThreadExecutor(), new ResourceManagerActionsImpl());
                ResourceManager.this.getRpcService().execute(new Runnable(){

                    @Override
                    public void run() {
                        ResourceManager.this.leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
                    }
                });
            }
        });
    }

    @Override
    public void revokeLeadership() {
        this.runAsync(new Runnable(){

            @Override
            public void run() {
                ResourceManager.this.log.info("ResourceManager {} was revoked leadership.", (Object)ResourceManager.this.getAddress());
                ResourceManager.this.clearState();
                ResourceManager.this.slotManager.suspend();
                ResourceManager.this.leaderSessionId = null;
            }
        });
    }

    @Override
    public void handleError(Exception exception) {
        this.onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
    }

    protected abstract void initialize() throws ResourceManagerException;

    protected abstract void shutDownApplication(ApplicationStatus var1, String var2);

    @VisibleForTesting
    public abstract void startNewWorker(ResourceProfile var1);

    public abstract void stopWorker(InstanceID var1);

    protected abstract WorkerType workerStarted(ResourceID var1);

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobManagerRegistration jobManagerRegistration;
                    ResourceManager.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
                    if (ResourceManager.this.jmResourceIdRegistrations.containsKey(resourceID) && (jobManagerRegistration = (JobManagerRegistration)ResourceManager.this.jmResourceIdRegistrations.get(resourceID)) != null) {
                        ResourceManager.this.closeJobManagerConnection(jobManagerRegistration.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                    }
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private TaskManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManager.this.log.info("The heartbeat of TaskManager with id {} timed out.", (Object)resourceID);
                    ResourceManager.this.closeTaskManagerConnection(resourceID, new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class JobLeaderIdActionsImpl
    implements JobLeaderIdActions {
        private JobLeaderIdActionsImpl() {
        }

        @Override
        public void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId) {
            ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobLeaderId);
        }

        @Override
        public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    if (ResourceManager.this.jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
                        ResourceManager.this.removeJob(jobId);
                    }
                }
            });
        }

        @Override
        public void handleError(Throwable error) {
            ResourceManager.this.onFatalErrorAsync(error);
        }
    }

    private class ResourceManagerActionsImpl
    implements ResourceManagerActions {
        private ResourceManagerActionsImpl() {
        }

        @Override
        public void releaseResource(InstanceID instanceId) {
            ResourceManager.this.stopWorker(instanceId);
        }

        @Override
        public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
            ResourceManager.this.startNewWorker(resourceProfile);
        }

        @Override
        public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
            ResourceManager.this.log.info("Slot request with allocation id {} for job {} failed.", new Object[]{allocationId, jobId, cause});
        }
    }
}

