/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.heartbeat.NoOpHeartbeatManager;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.EstablishedResourceManagerConnection;
import org.apache.flink.runtime.jobmaster.ExecutionGraphException;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
import org.apache.flink.runtime.jobmaster.JobMasterException;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterService;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

public class JobMaster
extends FencedRpcEndpoint<JobMasterId>
implements JobMasterGateway,
JobMasterService {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    private final JobMasterConfiguration jobMasterConfiguration;
    private final ResourceID resourceId;
    private final JobGraph jobGraph;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobWriter blobWriter;
    private final HeartbeatServices heartbeatServices;
    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
    private final ScheduledExecutorService scheduledExecutorService;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler fatalErrorHandler;
    private final ClassLoader userCodeLoader;
    private final SlotPool slotPool;
    private final Scheduler scheduler;
    private final SchedulerNGFactory schedulerNGFactory;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
    private final ShuffleMaster<?> shuffleMaster;
    private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
    private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private SchedulerNG schedulerNG;
    @Nullable
    private JobManagerJobStatusListener jobStatusListener;
    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;
    @Nullable
    private ResourceManagerAddress resourceManagerAddress;
    @Nullable
    private ResourceManagerConnection resourceManagerConnection;
    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
    private Map<String, Object> accumulators;
    private final JobMasterPartitionTracker partitionTracker;

    public JobMaster(RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceId, JobGraph jobGraph, HighAvailabilityServices highAvailabilityService, SlotPoolFactory slotPoolFactory, SchedulerFactory schedulerFactory, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, JobManagerJobMetricGroupFactory jobMetricGroupFactory, OnCompletionActions jobCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader, SchedulerNGFactory schedulerNGFactory, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory partitionTrackerFactory) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);
        this.jobMasterConfiguration = (JobMasterConfiguration)Preconditions.checkNotNull((Object)jobMasterConfiguration);
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityService);
        this.blobWriter = jobManagerSharedServices.getBlobWriter();
        this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
        this.jobCompletionActions = (OnCompletionActions)Preconditions.checkNotNull((Object)jobCompletionActions);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.schedulerNGFactory = (SchedulerNGFactory)Preconditions.checkNotNull((Object)schedulerNGFactory);
        this.heartbeatServices = (HeartbeatServices)Preconditions.checkNotNull((Object)heartbeatServices);
        this.jobMetricGroupFactory = (JobManagerJobMetricGroupFactory)Preconditions.checkNotNull((Object)jobMetricGroupFactory);
        String jobName = jobGraph.getName();
        JobID jid = jobGraph.getJobID();
        this.log.info("Initializing job {} ({}).", (Object)jobName, (Object)jid);
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = ((SlotPoolFactory)Preconditions.checkNotNull((Object)slotPoolFactory)).createSlotPool(jobGraph.getJobID());
        this.scheduler = ((SchedulerFactory)Preconditions.checkNotNull((Object)schedulerFactory)).createScheduler(this.slotPool);
        this.registeredTaskManagers = new HashMap<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>(4);
        this.partitionTracker = ((PartitionTrackerFactory)Preconditions.checkNotNull((Object)partitionTrackerFactory)).create(resourceID -> {
            Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = this.registeredTaskManagers.get(resourceID);
            if (taskManagerInfo == null) {
                return Optional.empty();
            }
            return Optional.of(taskManagerInfo.f1);
        });
        this.backPressureStatsTracker = (BackPressureStatsTracker)Preconditions.checkNotNull((Object)jobManagerSharedServices.getBackPressureStatsTracker());
        this.shuffleMaster = (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster);
        this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
        this.schedulerNG = this.createScheduler(this.jobManagerJobMetricGroup);
        this.jobStatusListener = null;
        this.resourceManagerConnection = null;
        this.establishedResourceManagerConnection = null;
        this.accumulators = new HashMap<String, Object>();
        this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
        this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
    }

    private SchedulerNG createScheduler(JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        return this.schedulerNGFactory.createInstance(this.log, this.jobGraph, this.backPressureStatsTracker, this.scheduledExecutorService, this.jobMasterConfiguration.getConfiguration(), this.scheduler, this.scheduledExecutorService, this.userCodeLoader, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.rpcTimeout, this.blobWriter, jobManagerJobMetricGroup, this.jobMasterConfiguration.getSlotRequestTimeout(), this.shuffleMaster, this.partitionTracker);
    }

    @Override
    public CompletableFuture<Acknowledge> start(JobMasterId newJobMasterId) throws Exception {
        this.start();
        return this.callAsyncWithoutFencing(() -> this.startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }

    @Override
    public CompletableFuture<Acknowledge> suspend(Exception cause) {
        CompletableFuture<Acknowledge> suspendFuture = this.callAsyncWithoutFencing(() -> this.suspendExecution(cause), RpcUtils.INF_TIMEOUT);
        return suspendFuture.whenComplete((acknowledge, throwable) -> this.stop());
    }

    @Override
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping the JobMaster for job {}({}).", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
        HashSet<ResourceID> taskManagerResourceIds = new HashSet<ResourceID>(this.registeredTaskManagers.keySet());
        FlinkException cause = new FlinkException("Stopping JobMaster for job " + this.jobGraph.getName() + '(' + this.jobGraph.getJobID() + ").");
        for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
            this.disconnectTaskManager(taskManagerResourceId, (Exception)((Object)cause));
        }
        this.suspendExecution((Exception)((Object)new FlinkException("JobManager is shutting down.")));
        this.slotPool.close();
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(Time timeout) {
        this.schedulerNG.cancel();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        Preconditions.checkNotNull((Object)taskExecutionState, (String)"taskExecutionState");
        if (this.schedulerNG.updateTaskExecutionState(taskExecutionState)) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new ExecutionGraphException("The execution attempt " + (Object)((Object)taskExecutionState.getID()) + " was not found.")));
    }

    @Override
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestNextInputSplit(vertexID, executionAttempt));
        }
        catch (IOException e) {
            this.log.warn("Error while requesting next input split", (Throwable)e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestPartitionState(intermediateResultId, resultPartitionId));
        }
        catch (PartitionProducerDisposedException e) {
            this.log.info("Error while requesting partition state", (Throwable)e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
        this.schedulerNG.scheduleOrUpdateConsumers(partitionID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause) {
        this.log.debug("Disconnect TaskExecutor {} because: {}", (Object)resourceID, (Object)cause.getMessage());
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        this.slotPool.releaseTaskManager(resourceID, cause);
        this.partitionTracker.stopTrackingPartitionsFor(resourceID);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = this.registeredTaskManagers.remove(resourceID);
        if (taskManagerConnection != null) {
            ((TaskExecutorGateway)taskManagerConnection.f1).disconnectJobManager(this.jobGraph.getJobID(), cause);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
    }

    @Override
    public void declineCheckpoint(DeclineCheckpoint decline) {
        this.schedulerNG.declineCheckpoint(decline);
    }

    @Override
    public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> serializedEvent) {
        try {
            OperatorEvent evt = (OperatorEvent)serializedEvent.deserializeValue(this.userCodeLoader);
            this.schedulerNG.deliverOperatorEventToCoordinator(task, operatorID, evt);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestKvStateLocation(jobId, registrationName));
        }
        catch (FlinkJobNotFoundException | UnknownKvStateLocation e) {
            this.log.info("Error while request key-value state location", (Throwable)e);
            return FutureUtils.completedExceptionally((Throwable)e);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) {
        try {
            this.schedulerNG.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (FlinkJobNotFoundException e) {
            this.log.info("Error while receiving notification about key-value state registration", (Throwable)((Object)e));
            return FutureUtils.completedExceptionally((Throwable)((Object)e));
        }
    }

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        try {
            this.schedulerNG.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (FlinkJobNotFoundException e) {
            this.log.info("Error while receiving notification about key-value state de-registration", (Throwable)((Object)e));
            return FutureUtils.completedExceptionally((Throwable)((Object)e));
        }
    }

    @Override
    public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout) {
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = this.registeredTaskManagers.get(taskManagerId);
        if (taskManager == null) {
            return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
        }
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)taskManager.f0;
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.f1;
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, (JobMasterId)((Object)this.getFencingToken()));
        return CompletableFuture.completedFuture(this.slotPool.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
    }

    @Override
    public void failSlot(ResourceID taskManagerId, AllocationID allocationId, Exception cause) {
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            this.internalFailAllocation(allocationId, cause);
        } else {
            this.log.warn("Cannot fail slot " + (Object)((Object)allocationId) + " because the TaskManager " + taskManagerId + " is unknown.");
        }
    }

    private void internalFailAllocation(AllocationID allocationId, Exception cause) {
        Optional<ResourceID> resourceIdOptional = this.slotPool.failAllocation(allocationId, cause);
        resourceIdOptional.ifPresent(taskManagerId -> {
            if (!this.partitionTracker.isTrackingPartitionsFor(taskManagerId)) {
                this.releaseEmptyTaskManager((ResourceID)taskManagerId);
            }
        });
    }

    private void releaseEmptyTaskManager(ResourceID resourceId) {
        this.disconnectTaskManager(resourceId, (Exception)((Object)new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceId))));
    }

    @Override
    public CompletableFuture<RegistrationResponse> registerTaskManager(String taskManagerRpcAddress, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, Time timeout) {
        TaskManagerLocation taskManagerLocation;
        try {
            taskManagerLocation = TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
        }
        catch (Throwable throwable2) {
            String errMsg = String.format("Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s", unresolvedTaskManagerLocation.getExternalAddress(), throwable2.getMessage());
            this.log.error(errMsg);
            return CompletableFuture.completedFuture(new RegistrationResponse.Decline(errMsg));
        }
        ResourceID taskManagerId = taskManagerLocation.getResourceID();
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            JMTMRegistrationSuccess response = new JMTMRegistrationSuccess(this.resourceId);
            return CompletableFuture.completedFuture(response);
        }
        return this.getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, throwable) -> {
            if (throwable != null) {
                return new RegistrationResponse.Decline(throwable.getMessage());
            }
            this.slotPool.registerTaskManager(taskManagerId);
            this.registeredTaskManagers.put(taskManagerId, (Tuple2<TaskManagerLocation, TaskExecutorGateway>)Tuple2.of((Object)taskManagerLocation, (Object)taskExecutorGateway));
            this.taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<AllocatedSlotReport>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, AllocatedSlotReport payload) {
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
                    taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport);
                }
            });
            return new JMTMRegistrationSuccess(this.resourceId);
        }, (Executor)this.getMainThreadExecutor());
    }

    @Override
    public void disconnectResourceManager(ResourceManagerId resourceManagerId, Exception cause) {
        if (this.isConnectingToResourceManager(resourceManagerId)) {
            this.reconnectToResourceManager(cause);
        }
    }

    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals((Object)resourceManagerId);
    }

    @Override
    public void heartbeatFromTaskManager(ResourceID resourceID, AccumulatorReport accumulatorReport) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJobDetails());
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJobStatus());
    }

    @Override
    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJob());
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, Time timeout) {
        return this.schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean advanceToEndOfEventTime, Time timeout) {
        return this.schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime);
    }

    @Override
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId) {
        try {
            Optional<OperatorBackPressureStats> operatorBackPressureStats = this.schedulerNG.requestOperatorBackPressureStats(jobVertexId);
            return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(operatorBackPressureStats.orElse(null)));
        }
        catch (FlinkException e) {
            this.log.info("Error while requesting operator back pressure stats", (Throwable)e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
        this.internalFailAllocation(allocationID, cause);
    }

    @Override
    public CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregateFunction) {
        AggregateFunction aggregateFunction = null;
        try {
            aggregateFunction = (AggregateFunction)InstantiationUtil.deserializeObject((byte[])serializedAggregateFunction, (ClassLoader)this.userCodeLoader);
        }
        catch (Exception e) {
            this.log.error("Error while attempting to deserialize user AggregateFunction.");
            return FutureUtils.completedExceptionally(e);
        }
        Object accumulator = this.accumulators.get(aggregateName);
        if (null == accumulator) {
            accumulator = aggregateFunction.createAccumulator();
        }
        accumulator = aggregateFunction.add(aggregand, accumulator);
        this.accumulators.put(aggregateName, accumulator);
        return CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator));
    }

    @Override
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, Time timeout) {
        try {
            CoordinationRequest request = (CoordinationRequest)serializedRequest.deserializeValue(this.userCodeLoader);
            return this.schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        this.validateRunsInMainThread();
        Preconditions.checkNotNull((Object)((Object)newJobMasterId), (String)"The new JobMasterId must not be null.");
        if (Objects.equals(this.getFencingToken(), (Object)newJobMasterId)) {
            this.log.info("Already started the job execution with JobMasterId {}.", (Object)newJobMasterId);
            return Acknowledge.get();
        }
        this.setNewFencingToken(newJobMasterId);
        this.startJobMasterServices();
        this.log.info("Starting execution of job {} ({}) under job master id {}.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), newJobMasterId});
        this.resetAndStartScheduler();
        return Acknowledge.get();
    }

    private void startJobMasterServices() throws Exception {
        this.startHeartbeatServices();
        this.slotPool.start((JobMasterId)((Object)this.getFencingToken()), this.getAddress(), this.getMainThreadExecutor());
        this.scheduler.start(this.getMainThreadExecutor());
        this.reconnectToResourceManager((Exception)((Object)new FlinkException("Starting JobMaster component.")));
        this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

    private void setNewFencingToken(JobMasterId newJobMasterId) {
        if (this.getFencingToken() != null) {
            this.log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", this.getFencingToken(), (Object)newJobMasterId);
            this.suspendExecution((Exception)((Object)new FlinkException("Old job with JobMasterId " + this.getFencingToken() + " is restarted with a new JobMasterId " + (Object)((Object)newJobMasterId) + '.')));
        }
        this.setFencingToken(newJobMasterId);
    }

    private Acknowledge suspendExecution(Exception cause) {
        this.validateRunsInMainThread();
        if (this.getFencingToken() == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return Acknowledge.get();
        }
        this.setFencingToken(null);
        try {
            this.resourceManagerLeaderRetriever.stop();
            this.resourceManagerAddress = null;
        }
        catch (Throwable t) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", t);
        }
        this.suspendAndClearSchedulerFields(cause);
        this.slotPool.suspend();
        this.closeResourceManagerConnection(cause);
        this.stopHeartbeatServices();
        return Acknowledge.get();
    }

    private void stopHeartbeatServices() {
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
    }

    private void startHeartbeatServices() {
        this.taskManagerHeartbeatManager = this.heartbeatServices.createHeartbeatManagerSender(this.resourceId, new TaskManagerHeartbeatListener(), this.getMainThreadExecutor(), this.log);
        this.resourceManagerHeartbeatManager = this.heartbeatServices.createHeartbeatManager(this.resourceId, new ResourceManagerHeartbeatListener(), this.getMainThreadExecutor(), this.log);
    }

    private void assignScheduler(SchedulerNG newScheduler, JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
        this.validateRunsInMainThread();
        Preconditions.checkState((boolean)this.schedulerNG.requestJobStatus().isTerminalState());
        Preconditions.checkState((this.jobManagerJobMetricGroup == null ? 1 : 0) != 0);
        this.schedulerNG = newScheduler;
        this.jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
    }

    private void resetAndStartScheduler() throws Exception {
        CompletionStage<Object> schedulerAssignedFuture;
        this.validateRunsInMainThread();
        if (this.schedulerNG.requestJobStatus() == JobStatus.CREATED) {
            schedulerAssignedFuture = CompletableFuture.completedFuture(null);
            this.schedulerNG.setMainThreadExecutor(this.getMainThreadExecutor());
        } else {
            this.suspendAndClearSchedulerFields((Exception)((Object)new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")));
            JobManagerJobMetricGroup newJobManagerJobMetricGroup = this.jobMetricGroupFactory.create(this.jobGraph);
            SchedulerNG newScheduler = this.createScheduler(newJobManagerJobMetricGroup);
            schedulerAssignedFuture = this.schedulerNG.getTerminationFuture().handle((ignored, throwable) -> {
                newScheduler.setMainThreadExecutor(this.getMainThreadExecutor());
                this.assignScheduler(newScheduler, newJobManagerJobMetricGroup);
                return null;
            });
        }
        schedulerAssignedFuture.thenRun(this::startScheduling);
    }

    private void startScheduling() {
        Preconditions.checkState((this.jobStatusListener == null ? 1 : 0) != 0);
        this.jobStatusListener = new JobManagerJobStatusListener();
        this.schedulerNG.registerJobStatusListener(this.jobStatusListener);
        this.schedulerNG.startScheduling();
    }

    private void suspendAndClearSchedulerFields(Exception cause) {
        this.suspendScheduler(cause);
        this.clearSchedulerFields();
    }

    private void suspendScheduler(Exception cause) {
        this.schedulerNG.suspend(cause);
        if (this.jobManagerJobMetricGroup != null) {
            this.jobManagerJobMetricGroup.close();
        }
        if (this.jobStatusListener != null) {
            this.jobStatusListener.stop();
        }
    }

    private void clearSchedulerFields() {
        this.jobManagerJobMetricGroup = null;
        this.jobStatusListener = null;
    }

    private void handleJobMasterError(Throwable cause) {
        if (ExceptionUtils.isJvmFatalError((Throwable)cause)) {
            this.log.error("Fatal error occurred on JobManager.", cause);
            this.fatalErrorHandler.onFatalError(cause);
        } else {
            this.jobCompletionActions.jobMasterFailed(cause);
        }
    }

    private void jobStatusChanged(JobStatus newJobStatus, long timestamp, @Nullable Throwable error) {
        this.validateRunsInMainThread();
        if (newJobStatus.isGloballyTerminalState()) {
            this.runAsync(() -> this.registeredTaskManagers.keySet().forEach(newJobStatus == JobStatus.FINISHED ? this.partitionTracker::stopTrackingAndReleaseOrPromotePartitionsFor : this.partitionTracker::stopTrackingAndReleasePartitionsFor));
            ArchivedExecutionGraph archivedExecutionGraph = this.schedulerNG.requestJob();
            this.scheduledExecutorService.execute(() -> this.jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
        }
    }

    private void notifyOfNewResourceManagerLeader(String newResourceManagerAddress, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = this.createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
        this.reconnectToResourceManager((Exception)((Object)new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress))));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String newResourceManagerAddress, @Nullable ResourceManagerId resourceManagerId) {
        if (newResourceManagerAddress != null) {
            Preconditions.checkNotNull((Object)((Object)resourceManagerId));
            return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
        }
        return null;
    }

    private void reconnectToResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
        this.tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            this.connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        assert (this.resourceManagerAddress != null);
        assert (this.resourceManagerConnection == null);
        assert (this.establishedResourceManagerConnection == null);
        this.log.info("Connecting to ResourceManager {}", (Object)this.resourceManagerAddress);
        this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, this.getAddress(), (JobMasterId)((Object)this.getFencingToken()), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.scheduledExecutorService);
        this.resourceManagerConnection.start();
    }

    private void establishResourceManagerConnection(JobMasterRegistrationSuccess success) {
        ResourceManagerId resourceManagerId = success.getResourceManagerId();
        if (this.resourceManagerConnection != null && Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), (Object)resourceManagerId)) {
            this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", (Object)resourceManagerId);
            final ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
            this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceManagerResourceId);
            this.slotPool.connectToResourceManager(resourceManagerGateway);
            this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                }
            });
        } else {
            this.log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", (Object)resourceManagerId);
        }
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.establishedResourceManagerConnection != null) {
            this.dissolveResourceManagerConnection(this.establishedResourceManagerConnection, cause);
            this.establishedResourceManagerConnection = null;
        }
        if (this.resourceManagerConnection != null) {
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
        ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close ResourceManager connection {}.", (Object)resourceManagerResourceID, (Object)cause);
        } else {
            this.log.info("Close ResourceManager connection {}: {}.", (Object)resourceManagerResourceID, (Object)cause.getMessage());
        }
        this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
        ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
        resourceManagerGateway.disconnectJobManager(this.jobGraph.getJobID(), cause);
        this.slotPool.disconnectResourceManager();
    }

    @Override
    public JobMasterGateway getGateway() {
        return this.getSelfGateway(JobMasterGateway.class);
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceId) {
            JobMaster.this.validateRunsInMainThread();
            JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
            if (JobMaster.this.establishedResourceManagerConnection != null && JobMaster.this.establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
                JobMaster.this.reconnectToResourceManager((Exception)((Object)new JobMasterException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))));
            }
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Void retrievePayload(ResourceID resourceID) {
            return null;
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<AccumulatorReport, AllocatedSlotReport> {
        private TaskManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.validateRunsInMainThread();
            JobMaster.this.disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
            JobMaster.this.validateRunsInMainThread();
            for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
                JobMaster.this.schedulerNG.updateAccumulators(snapshot);
            }
        }

        @Override
        public AllocatedSlotReport retrievePayload(ResourceID resourceID) {
            JobMaster.this.validateRunsInMainThread();
            return JobMaster.this.slotPool.createAllocatedSlotReport(resourceID);
        }
    }

    private class JobManagerJobStatusListener
    implements JobStatusListener {
        private volatile boolean running = true;

        private JobManagerJobStatusListener() {
        }

        @Override
        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
            if (this.running) {
                JobMaster.this.runAsync(() -> JobMaster.this.jobStatusChanged(newJobStatus, timestamp, error));
            }
        }

        private void stop() {
            this.running = false;
        }
    }

    private class ResourceManagerConnection
    extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final JobMasterId jobMasterId;

        ResourceManagerConnection(Logger log, JobID jobID, ResourceID jobManagerResourceID, String jobManagerRpcAddress, JobMasterId jobMasterId, String resourceManagerAddress, ResourceManagerId resourceManagerId, Executor executor) {
            super(log, resourceManagerAddress, resourceManagerId, executor);
            this.jobID = (JobID)Preconditions.checkNotNull((Object)jobID);
            this.jobManagerResourceID = (ResourceID)Preconditions.checkNotNull((Object)jobManagerResourceID);
            this.jobManagerRpcAddress = (String)Preconditions.checkNotNull((Object)jobManagerRpcAddress);
            this.jobMasterId = (JobMasterId)((Object)Preconditions.checkNotNull((Object)((Object)jobMasterId)));
        }

        @Override
        protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, this.getTargetAddress(), (ResourceManagerId)((Object)this.getTargetLeaderId()), JobMaster.this.jobMasterConfiguration.getRetryingRegistrationConfiguration()){

                @Override
                protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
                    Time timeout = Time.milliseconds((long)timeoutMillis);
                    return gateway.registerJobManager(ResourceManagerConnection.this.jobMasterId, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, timeout);
                }
            };
        }

        @Override
        protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
            JobMaster.this.runAsync(() -> {
                if (this == JobMaster.this.resourceManagerConnection) {
                    JobMaster.this.establishResourceManagerConnection(success);
                }
            });
        }

        @Override
        protected void onRegistrationFailure(Throwable failure) {
            JobMaster.this.handleJobMasterError(failure);
        }
    }

    private class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            JobMaster.this.runAsync(() -> JobMaster.this.notifyOfNewResourceManagerLeader(leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            JobMaster.this.handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception));
        }
    }
}

