package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher.class */
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final ResourceManagerGateway resourceManagerGateway;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
    private final LeaderElectionService leaderElectionService;
    private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;

    @Nullable
    private final String metricQueryServicePath;

    @Nullable
    protected final String restAddress;
    private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
    private CompletableFuture<Void> recoveryOperation;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$DefaultJobManagerRunnerFactory.class */
    public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
        INSTANCE;

        @Override // org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory
        public JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            return new JobManagerRunner(resourceID, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$JobManagerRunnerFactory.class */
    public interface JobManagerRunnerFactory {
        JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception;
    }

    public Dispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String str2, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String str3, HistoryServerArchivist historyServerArchivist) throws Exception {
        super(rpcService, str);
        this.recoveryOperation = CompletableFuture.completedFuture(null);
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.resourceManagerGateway = (ResourceManagerGateway) Preconditions.checkNotNull(resourceManagerGateway);
        this.heartbeatServices = (HeartbeatServices) Preconditions.checkNotNull(heartbeatServices);
        this.blobServer = (BlobServer) Preconditions.checkNotNull(blobServer);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.submittedJobGraphStore = (SubmittedJobGraphStore) Preconditions.checkNotNull(submittedJobGraphStore);
        this.jobManagerMetricGroup = (JobManagerMetricGroup) Preconditions.checkNotNull(jobManagerMetricGroup);
        this.metricQueryServicePath = str2;
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, this.blobServer);
        this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
        this.jobManagerRunnerFutures = new HashMap(16);
        this.leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
        this.restAddress = str3;
        this.historyServerArchivist = (HistoryServerArchivist) Preconditions.checkNotNull(historyServerArchivist);
        this.archivedExecutionGraphStore = (ArchivedExecutionGraphStore) Preconditions.checkNotNull(archivedExecutionGraphStore);
        this.jobManagerRunnerFactory = (JobManagerRunnerFactory) Preconditions.checkNotNull(jobManagerRunnerFactory);
        this.jobManagerTerminationFutures = new HashMap(2);
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping dispatcher {}.", getAddress());
        return FutureUtils.runAfterwards(terminateJobManagerRunnersAndGetTerminationFuture(), () -> {
            Exception exc = null;
            try {
                this.jobManagerSharedServices.shutdown();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
            }
            try {
                this.submittedJobGraphStore.stop();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            try {
                this.leaderElectionService.stop();
            } catch (Exception e3) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
            }
            this.jobManagerMetricGroup.close();
            if (exc != null) {
                throw exc;
            }
            this.log.info("Stopped dispatcher {}.", getAddress());
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() throws Exception {
        super.start();
        this.submittedJobGraphStore.start(this);
        this.leaderElectionService.start(this);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time time) {
        JobID jobID = jobGraph.getJobID();
        this.log.info("Submitting job {} ({}).", jobID, jobGraph.getName());
        try {
            RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(jobID);
            return (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || this.jobManagerRunnerFutures.containsKey(jobID)) ? FutureUtils.completedExceptionally(new JobSubmissionException(jobID, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus))) : waitForTerminatingJobManager(jobID, jobGraph, this::persistAndRunJob).thenApply(r2 -> {
                return Acknowledge.get();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
                this.log.error("Failed to submit job {}.", jobID, stripCompletionException);
                throw new CompletionException((Throwable) new JobSubmissionException(jobID, "Failed to submit job.", stripCompletionException));
            });
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobID), e));
        }
    }

    private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
        return runJob(jobGraph).whenComplete(BiConsumerWithException.unchecked((obj, th) -> {
            if (th != null) {
                this.submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
            }
        }));
    }

    private CompletableFuture<Void> runJob(JobGraph jobGraph) {
        Preconditions.checkState(!this.jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
        CompletableFuture<JobManagerRunner> createJobManagerRunner = createJobManagerRunner(jobGraph);
        this.jobManagerRunnerFutures.put(jobGraph.getJobID(), createJobManagerRunner);
        return createJobManagerRunner.thenApply(FunctionUtils.nullFn()).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (r5, th) -> {
            if (th != null) {
                this.jobManagerRunnerFutures.remove(jobGraph.getJobID());
            }
        }, (Executor) getMainThreadExecutor());
    }

    private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
        RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
            return this.jobManagerRunnerFactory.createJobManagerRunner(ResourceID.generate(), jobGraph, this.configuration, rpcService, this.highAvailabilityServices, this.heartbeatServices, this.blobServer, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler);
        }), rpcService.getExecutor()).thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
    }

    private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
        JobID jobID = jobManagerRunner.getJobGraph().getJobID();
        jobManagerRunner.getResultFuture().whenCompleteAsync((archivedExecutionGraph, th) -> {
            if (jobManagerRunner != this.jobManagerRunnerFutures.get(jobID).getNow(null)) {
                this.log.debug("There is a newer JobManagerRunner for the job {}.", jobID);
                return;
            }
            if (archivedExecutionGraph != null) {
                jobReachedGloballyTerminalState(archivedExecutionGraph);
                return;
            }
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            if (stripCompletionException instanceof JobNotFinishedException) {
                jobNotFinished(jobID);
            } else {
                jobMasterFailed(jobID, stripCompletionException);
            }
        }, (Executor) getMainThreadExecutor());
        jobManagerRunner.start();
        return jobManagerRunner;
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Collection<JobID>> listJobs(Time time) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(new HashSet(this.jobManagerRunnerFutures.keySet())));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> disposeSavepoint(String str, Time time) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", str);
            try {
                Checkpoints.disposeSavepoint(str, this.configuration, contextClassLoader, this.log);
                return Acknowledge.get();
            } catch (IOException | FlinkException e) {
                throw new CompletionException((Throwable) new FlinkException(String.format("Could not dispose savepoint %s.", str), e));
            }
        }, this.jobManagerSharedServices.getScheduledExecutorService());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> cancelJob(JobID jobID, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.cancel(time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> stopJob(JobID jobID, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.stop(time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> rescaleJob(JobID jobID, int i, RescalingBehaviour rescalingBehaviour, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.rescaleJob(i, rescalingBehaviour, time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> requestRestAddress(Time time) {
        return this.restAddress != null ? CompletableFuture.completedFuture(this.restAddress) : FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint."));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        CompletableFuture<ResourceOverview> requestResourceOverview = this.resourceManagerGateway.requestResourceOverview(time);
        CompletableFuture<U> thenApply = FutureUtils.combineAll(queryJobMastersForInformation(jobMasterGateway -> {
            return jobMasterGateway.requestJobStatus(time);
        })).thenApply(this::flattenOptionalCollection);
        JobsOverview storedJobsOverview = this.archivedExecutionGraphStore.getStoredJobsOverview();
        return thenApply.thenCombine((CompletionStage) requestResourceOverview, (BiFunction<? super U, ? super U, ? extends V>) (collection, resourceOverview) -> {
            return new ClusterOverview(resourceOverview, JobsOverview.create(collection).combine(storedJobsOverview));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time time) {
        CompletableFuture<U> thenApply = FutureUtils.combineAll(queryJobMastersForInformation(jobMasterGateway -> {
            return jobMasterGateway.requestJobDetails(time);
        })).thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> availableJobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails();
        return thenApply.thenApply((Function<? super U, ? extends U>) collection -> {
            ArrayList arrayList = new ArrayList(availableJobDetails.size() + collection.size());
            arrayList.addAll(collection);
            arrayList.addAll(availableJobDetails);
            return new MultipleJobsDetails(arrayList);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobID, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.requestJobStatus(time);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            JobDetails availableJobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails(jobID);
            if (availableJobDetails == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException(th));
            }
            return availableJobDetails.getStatus();
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobID jobID, JobVertexID jobVertexID) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.requestOperatorBackPressureStats(jobVertexID);
        });
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway, org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobID, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.requestJob(time);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            ArchivedExecutionGraph archivedExecutionGraph = this.archivedExecutionGraphStore.get(jobID);
            if (archivedExecutionGraph == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException(th));
            }
            return archivedExecutionGraph;
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobResult> requestJobResult(JobID jobID, Time time) {
        CompletableFuture<JobManagerRunner> completableFuture = this.jobManagerRunnerFutures.get(jobID);
        if (completableFuture != null) {
            return completableFuture.thenCompose((v0) -> {
                return v0.getResultFuture();
            }).thenApply((Function<? super U, ? extends U>) (v0) -> {
                return JobResult.createFrom(v0);
            });
        }
        ArchivedExecutionGraph archivedExecutionGraph = this.archivedExecutionGraphStore.get(jobID);
        return archivedExecutionGraph == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time time) {
        return this.metricQueryServicePath != null ? CompletableFuture.completedFuture(Collections.singleton(this.metricQueryServicePath)) : CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time time) {
        return this.resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(time);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Integer> getBlobServerPort(Time time) {
        return CompletableFuture.completedFuture(Integer.valueOf(this.blobServer.getPort()));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> triggerSavepoint(JobID jobID, String str, boolean z, Time time) {
        return getJobMasterGatewayFuture(jobID).thenCompose(jobMasterGateway -> {
            return jobMasterGateway.triggerSavepoint(str, z, time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> shutDownCluster() {
        shutDown();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private void removeJobAndRegisterTerminationFuture(JobID jobID, boolean z) {
        registerJobManagerRunnerTerminationFuture(jobID, removeJob(jobID, z));
    }

    private void registerJobManagerRunnerTerminationFuture(JobID jobID, CompletableFuture<Void> completableFuture) {
        Preconditions.checkState(!this.jobManagerTerminationFutures.containsKey(jobID));
        this.jobManagerTerminationFutures.put(jobID, completableFuture);
        completableFuture.thenRunAsync(() -> {
            CompletableFuture<Void> remove = this.jobManagerTerminationFutures.remove(jobID);
            if (remove == null || remove == completableFuture) {
                return;
            }
            this.jobManagerTerminationFutures.put(jobID, remove);
        }, getUnfencedMainThreadExecutor());
    }

    private CompletableFuture<Void> removeJob(JobID jobID, boolean z) {
        CompletableFuture<JobManagerRunner> remove = this.jobManagerRunnerFutures.remove(jobID);
        return (remove != null ? remove.thenCompose((v0) -> {
            return v0.closeAsync();
        }) : CompletableFuture.completedFuture(null)).thenRunAsync(() -> {
            cleanUpJobData(jobID, z);
        }, getRpcService().getExecutor());
    }

    private void cleanUpJobData(JobID jobID, boolean z) {
        this.jobManagerMetricGroup.removeJob(jobID);
        boolean z2 = false;
        if (z) {
            try {
                this.submittedJobGraphStore.removeJobGraph(jobID);
                z2 = true;
            } catch (Exception e) {
                this.log.warn("Could not properly remove job {} from submitted job graph store.", jobID, e);
            }
            try {
                this.runningJobsRegistry.clearJob(jobID);
            } catch (IOException e2) {
                this.log.warn("Could not properly remove job {} from the running jobs registry.", jobID, e2);
            }
        } else {
            try {
                this.submittedJobGraphStore.releaseJobGraph(jobID);
            } catch (Exception e3) {
                this.log.warn("Could not properly release job {} from submitted job graph store.", jobID, e3);
            }
        }
        this.blobServer.cleanupJob(jobID, z2);
    }

    private void terminateJobManagerRunners() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
        Iterator it = new HashSet(this.jobManagerRunnerFutures.keySet()).iterator();
        while (it.hasNext()) {
            removeJobAndRegisterTerminationFuture((JobID) it.next(), false);
        }
    }

    private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
        terminateJobManagerRunners();
        return FutureUtils.completeAll(this.jobManagerTerminationFutures.values());
    }

    @VisibleForTesting
    Collection<JobGraph> recoverJobs() throws Exception {
        this.log.info("Recovering all persisted jobs.");
        Collection<JobID> jobIds = this.submittedJobGraphStore.getJobIds();
        try {
            return recoverJobGraphs(jobIds);
        } catch (Exception e) {
            Iterator<JobID> it = jobIds.iterator();
            while (it.hasNext()) {
                try {
                    this.submittedJobGraphStore.releaseJobGraph(it.next());
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
            }
            throw e;
        }
    }

    @Nonnull
    private Collection<JobGraph> recoverJobGraphs(Collection<JobID> collection) throws Exception {
        ArrayList arrayList = new ArrayList(collection.size());
        for (JobID jobID : collection) {
            JobGraph recoverJob = recoverJob(jobID);
            if (recoverJob == null) {
                throw new FlinkJobNotFoundException(jobID);
            }
            arrayList.add(recoverJob);
        }
        return arrayList;
    }

    @Nullable
    private JobGraph recoverJob(JobID jobID) throws Exception {
        this.log.debug("Recover job {}.", jobID);
        SubmittedJobGraph recoverJobGraph = this.submittedJobGraphStore.recoverJobGraph(jobID);
        if (recoverJobGraph != null) {
            return recoverJobGraph.getJobGraph();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onFatalError(Throwable th) {
        this.fatalErrorHandler.onFatalError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), "Job %s is in state %s which is not globally terminal.", new Object[]{archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()});
        this.log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
        archiveExecutionGraph(archivedExecutionGraph);
        removeJobAndRegisterTerminationFuture(archivedExecutionGraph.getJobID(), true);
    }

    private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
        try {
            this.archivedExecutionGraphStore.put(archivedExecutionGraph);
        } catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), e});
        }
        this.historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph).whenComplete((acknowledge, th) -> {
            if (th != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), th});
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void jobNotFinished(JobID jobID) {
        this.log.info("Job {} was not finished by JobManager.", jobID);
        removeJobAndRegisterTerminationFuture(jobID, false);
    }

    private void jobMasterFailed(JobID jobID, Throwable th) {
        onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobID), th));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobID) {
        CompletableFuture<JobManagerRunner> completableFuture = this.jobManagerRunnerFutures.get(jobID);
        return completableFuture == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : completableFuture.thenCompose((v0) -> {
            return v0.getLeaderGatewayFuture();
        }).thenApplyAsync((Function<? super U, ? extends U>) jobMasterGateway -> {
            if (this.jobManagerRunnerFutures.containsKey(jobID)) {
                return jobMasterGateway;
            }
            throw new CompletionException((Throwable) new FlinkJobNotFoundException(jobID));
        }, (Executor) getMainThreadExecutor());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> collection) {
        return (List) collection.stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> function) {
        ArrayList arrayList = new ArrayList(this.jobManagerRunnerFutures.size());
        Iterator<JobID> it = this.jobManagerRunnerFutures.keySet().iterator();
        while (it.hasNext()) {
            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(it.next());
            function.getClass();
            arrayList.add(jobMasterGatewayFuture.thenCompose((v1) -> {
                return r1.apply(v1);
            }).handle((BiFunction<? super U, Throwable, ? extends U>) (obj, th) -> {
                return Optional.ofNullable(obj);
            }));
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), uuid);
            CompletableFuture<U> thenApplyAsync = this.recoveryOperation.thenApplyAsync(FunctionUtils.uncheckedFunction(r3 -> {
                return recoverJobs();
            }), getRpcService().getExecutor());
            CompletableFuture<Void> thenCombineAsync = thenApplyAsync.thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) collection -> {
                return tryAcceptLeadershipAndRunJobs(uuid, collection);
            }, getUnfencedMainThreadExecutor()).thenCombineAsync((CompletionStage) thenApplyAsync, BiFunctionWithException.unchecked((bool, collection2) -> {
                if (bool.booleanValue()) {
                    this.leaderElectionService.confirmLeaderSessionID(uuid);
                    return null;
                }
                Iterator it = collection2.iterator();
                while (it.hasNext()) {
                    this.submittedJobGraphStore.releaseJobGraph(((JobGraph) it.next()).getJobID());
                }
                return null;
            }), getRpcService().getExecutor());
            thenCombineAsync.whenComplete((r4, th) -> {
                if (th != null) {
                    onFatalError(ExceptionUtils.stripCompletionException(th));
                }
            });
            this.recoveryOperation = thenCombineAsync;
        });
    }

    private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID uuid, Collection<JobGraph> collection) {
        DispatcherId fromUuid = DispatcherId.fromUuid(uuid);
        if (!this.leaderElectionService.hasLeadership(uuid)) {
            this.log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), fromUuid);
            return CompletableFuture.completedFuture(false);
        }
        this.log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), fromUuid);
        setNewFencingToken(fromUuid);
        ArrayList arrayList = new ArrayList(collection.size());
        for (JobGraph jobGraph : collection) {
            arrayList.add(waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::runJob));
        }
        return FutureUtils.waitForAll(arrayList).thenApply(r2 -> {
            return true;
        });
    }

    private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobID, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> functionWithException) {
        return getJobTerminationFuture(jobID).exceptionally(th -> {
            throw new CompletionException((Throwable) new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobID), th));
        }).thenComposeAsync(FunctionUtils.uncheckedFunction(r7 -> {
            this.jobManagerTerminationFutures.remove(jobID);
            return (CompletableFuture) functionWithException.apply(jobGraph);
        }), (Executor) getMainThreadExecutor());
    }

    CompletableFuture<Void> getJobTerminationFuture(JobID jobID) {
        return this.jobManagerRunnerFutures.containsKey(jobID) ? FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobID))) : this.jobManagerTerminationFutures.getOrDefault(jobID, CompletableFuture.completedFuture(null));
    }

    @VisibleForTesting
    CompletableFuture<Void> getRecoveryOperation() {
        return this.recoveryOperation;
    }

    private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
        if (getFencingToken() != null) {
            clearDispatcherState();
        }
        setFencingToken(dispatcherId);
    }

    private void clearDispatcherState() {
        terminateJobManagerRunners();
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was revoked leadership.", getAddress());
            setNewFencingToken(null);
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exc));
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
    public void onAddedJobGraph(JobID jobID) {
        runAsync(() -> {
            if (this.jobManagerRunnerFutures.containsKey(jobID)) {
                return;
            }
            CompletableFuture<U> thenApplyAsync = this.recoveryOperation.thenApplyAsync(FunctionUtils.uncheckedFunction(r5 -> {
                return Optional.ofNullable(recoverJob(jobID));
            }), getRpcService().getExecutor());
            DispatcherId fencingToken = getFencingToken();
            CompletableFuture<Void> thenComposeAsync = thenApplyAsync.thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) optional -> {
                return (CompletableFuture) optional.map(FunctionUtils.uncheckedFunction(jobGraph -> {
                    return tryRunRecoveredJobGraph(jobGraph, fencingToken).thenAcceptAsync(FunctionUtils.uncheckedConsumer(bool -> {
                        if (bool.booleanValue()) {
                            return;
                        }
                        this.submittedJobGraphStore.releaseJobGraph(jobID);
                    }), getRpcService().getExecutor());
                })).orElse(CompletableFuture.completedFuture(null));
            }, getUnfencedMainThreadExecutor());
            thenComposeAsync.whenComplete((r11, th) -> {
                if (th != null) {
                    onFatalError(new DispatcherException(String.format("Could not start the added job %s", jobID), ExceptionUtils.stripCompletionException(th)));
                }
            });
            this.recoveryOperation = thenComposeAsync;
        });
    }

    private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
        if (this.leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
            JobID jobID = jobGraph.getJobID();
            if (this.jobManagerRunnerFutures.containsKey(jobID)) {
                this.log.debug("Ignore added JobGraph because the job {} is already running.", jobID);
                return CompletableFuture.completedFuture(true);
            }
            if (this.runningJobsRegistry.getJobSchedulingStatus(jobID) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
                return waitForTerminatingJobManager(jobID, jobGraph, this::runJob).thenApply(r2 -> {
                    return true;
                });
            }
            this.log.debug("Ignore added JobGraph because the job {} has already been completed.", jobID);
        }
        return CompletableFuture.completedFuture(false);
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
    public void onRemovedJobGraph(JobID jobID) {
        runAsync(() -> {
            try {
                removeJobAndRegisterTerminationFuture(jobID, false);
            } catch (Exception e) {
                onFatalError(new DispatcherException(String.format("Could not remove job %s.", jobID), e));
            }
        });
    }
}
