package org.apache.flink.runtime.resourcemanager;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/JobLeaderIdService.class */
public class JobLeaderIdService {
    private static final Logger LOG = LoggerFactory.getLogger(JobLeaderIdService.class);
    private final HighAvailabilityServices highAvailabilityServices;
    private final ScheduledExecutor scheduledExecutor;
    private final Time jobTimeout;
    private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners = new HashMap(4);
    private JobLeaderIdActions jobLeaderIdActions = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/JobLeaderIdService$JobLeaderIdListener.class */
    public final class JobLeaderIdListener implements LeaderRetrievalListener {
        private final Object timeoutLock;
        private final JobID jobId;
        private final JobLeaderIdActions listenerJobLeaderIdActions;
        private final LeaderRetrievalService leaderRetrievalService;
        private volatile CompletableFuture<UUID> leaderIdFuture;
        private volatile boolean running;

        @Nullable
        private volatile ScheduledFuture<?> timeoutFuture;

        @Nullable
        private volatile UUID timeoutId;

        private JobLeaderIdListener(JobID jobID, JobLeaderIdActions jobLeaderIdActions, LeaderRetrievalService leaderRetrievalService) throws Exception {
            this.timeoutLock = new Object();
            this.running = true;
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
            this.listenerJobLeaderIdActions = (JobLeaderIdActions) Preconditions.checkNotNull(jobLeaderIdActions);
            this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
            this.leaderIdFuture = new CompletableFuture<>();
            activateTimeout();
            leaderRetrievalService.start(this);
        }

        public CompletableFuture<UUID> getLeaderIdFuture() {
            return this.leaderIdFuture;
        }

        @Nullable
        public UUID getTimeoutId() {
            return this.timeoutId;
        }

        public void stop() throws Exception {
            this.running = false;
            this.leaderRetrievalService.stop();
            cancelTimeout();
            this.leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped."));
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(@Nullable String str, @Nullable UUID uuid) {
            if (!this.running) {
                JobLeaderIdService.LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.", uuid, str);
                return;
            }
            UUID uuid2 = null;
            if (this.leaderIdFuture.isDone()) {
                try {
                    uuid2 = this.leaderIdFuture.getNow(null);
                } catch (CompletionException e) {
                    handleError(e);
                }
                if (uuid == null) {
                    JobLeaderIdService.LOG.debug("Job {} no longer has a job leader.", this.jobId);
                    this.leaderIdFuture = new CompletableFuture<>();
                } else {
                    JobLeaderIdService.LOG.debug("Job {} has a new job leader {}@{}.", new Object[]{this.jobId, uuid, str});
                    this.leaderIdFuture = CompletableFuture.completedFuture(uuid);
                }
            } else if (uuid != null) {
                JobLeaderIdService.LOG.debug("Job {} has a new job leader {}@{}.", new Object[]{this.jobId, uuid, str});
                this.leaderIdFuture.complete(uuid);
            }
            if (uuid2 == null || uuid2.equals(uuid)) {
                if (null != uuid) {
                    cancelTimeout();
                    return;
                }
                return;
            }
            this.listenerJobLeaderIdActions.jobLeaderLostLeadership(this.jobId, new JobMasterId(uuid2));
            if (null == uuid) {
                activateTimeout();
                if (this.running) {
                    return;
                }
                cancelTimeout();
            }
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            if (this.running) {
                this.listenerJobLeaderIdActions.handleError(exc);
            } else {
                JobLeaderIdService.LOG.debug("An error occurred in the {} after the listener has been stopped.", JobLeaderIdListener.class.getSimpleName(), exc);
            }
        }

        private void activateTimeout() {
            synchronized (this.timeoutLock) {
                cancelTimeout();
                final UUID randomUUID = UUID.randomUUID();
                this.timeoutId = randomUUID;
                this.timeoutFuture = JobLeaderIdService.this.scheduledExecutor.schedule(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.JobLeaderIdService.JobLeaderIdListener.1
                    @Override // java.lang.Runnable
                    public void run() {
                        JobLeaderIdListener.this.listenerJobLeaderIdActions.notifyJobTimeout(JobLeaderIdListener.this.jobId, randomUUID);
                    }
                }, JobLeaderIdService.this.jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            }
        }

        private void cancelTimeout() {
            synchronized (this.timeoutLock) {
                if (this.timeoutFuture != null) {
                    this.timeoutFuture.cancel(true);
                }
                this.timeoutFuture = null;
                this.timeoutId = null;
            }
        }
    }

    public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, Time time) {
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
        this.scheduledExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
        this.jobTimeout = (Time) Preconditions.checkNotNull(time, "jobTimeout");
    }

    public void start(JobLeaderIdActions jobLeaderIdActions) throws Exception {
        if (isStarted()) {
            clear();
        }
        this.jobLeaderIdActions = (JobLeaderIdActions) Preconditions.checkNotNull(jobLeaderIdActions);
    }

    public void stop() throws Exception {
        clear();
        this.jobLeaderIdActions = null;
    }

    public boolean isStarted() {
        return this.jobLeaderIdActions == null;
    }

    public void clear() throws Exception {
        Exception exc = null;
        Iterator<JobLeaderIdListener> it = this.jobLeaderIdListeners.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        if (exc != null) {
            ExceptionUtils.rethrowException(exc, "Could not properly stop the " + JobLeaderIdService.class.getSimpleName() + '.');
        }
        this.jobLeaderIdListeners.clear();
    }

    public void addJob(JobID jobID) throws Exception {
        Preconditions.checkNotNull(this.jobLeaderIdActions);
        LOG.debug("Add job {} to job leader id monitoring.", jobID);
        if (this.jobLeaderIdListeners.containsKey(jobID)) {
            return;
        }
        this.jobLeaderIdListeners.put(jobID, new JobLeaderIdListener(jobID, this.jobLeaderIdActions, this.highAvailabilityServices.getJobManagerLeaderRetriever(jobID)));
    }

    public void removeJob(JobID jobID) throws Exception {
        LOG.debug("Remove job {} from job leader id monitoring.", jobID);
        JobLeaderIdListener remove = this.jobLeaderIdListeners.remove(jobID);
        if (remove != null) {
            remove.stop();
        }
    }

    public boolean containsJob(JobID jobID) {
        return this.jobLeaderIdListeners.containsKey(jobID);
    }

    public CompletableFuture<JobMasterId> getLeaderId(JobID jobID) throws Exception {
        if (!this.jobLeaderIdListeners.containsKey(jobID)) {
            addJob(jobID);
        }
        return this.jobLeaderIdListeners.get(jobID).getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
    }

    public boolean isValidTimeout(JobID jobID, UUID uuid) {
        JobLeaderIdListener jobLeaderIdListener = this.jobLeaderIdListeners.get(jobID);
        if (null != jobLeaderIdListener) {
            return Objects.equals(uuid, jobLeaderIdListener.getTimeoutId());
        }
        return false;
    }
}
