package org.apache.flink.runtime.dispatcher.runner;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.class */
public abstract class AbstractDispatcherLeaderProcess implements DispatcherLeaderProcess {
    private final UUID leaderSessionId;
    private final FatalErrorHandler fatalErrorHandler;

    @Nullable
    private DispatcherGatewayService dispatcherService;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Object lock = new Object();
    private final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = new CompletableFuture<>();
    private final CompletableFuture<String> leaderAddressFuture = this.dispatcherGatewayFuture.thenApply((v0) -> {
        return v0.getAddress();
    });
    private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
    private final CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();
    private State state = State.CREATED;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess$DispatcherGatewayService.class */
    public interface DispatcherGatewayService extends AutoCloseableAsync {
        DispatcherGateway getGateway();

        CompletableFuture<Void> onRemovedJobGraph(JobID jobID);

        CompletableFuture<ApplicationStatus> getShutDownFuture();

        CompletableFuture<Void> getTerminationFuture();
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess$DispatcherGatewayServiceFactory.class */
    public interface DispatcherGatewayServiceFactory {
        DispatcherGatewayService create(DispatcherId dispatcherId, Collection<JobGraph> collection, JobGraphWriter jobGraphWriter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess$State.class */
    public enum State {
        CREATED,
        RUNNING,
        STOPPED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractDispatcherLeaderProcess(UUID uuid, FatalErrorHandler fatalErrorHandler) {
        this.leaderSessionId = uuid;
        this.fatalErrorHandler = fatalErrorHandler;
    }

    @VisibleForTesting
    State getState() {
        State state;
        synchronized (this.lock) {
            state = this.state;
        }
        return state;
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

    private void startInternal() {
        this.log.info("Start {}.", getClass().getSimpleName());
        this.state = State.RUNNING;
        onStart();
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public final UUID getLeaderSessionId() {
        return this.leaderSessionId;
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public final CompletableFuture<DispatcherGateway> getDispatcherGateway() {
        return this.dispatcherGatewayFuture;
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public final CompletableFuture<String> getLeaderAddressFuture() {
        return this.leaderAddressFuture;
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Optional<DispatcherGatewayService> getDispatcherService() {
        return Optional.ofNullable(this.dispatcherService);
    }

    public final CompletableFuture<Void> closeAsync() {
        runIfStateIsNot(State.STOPPED, this::closeInternal);
        return this.terminationFuture;
    }

    private void closeInternal() {
        this.log.info("Stopping {}.", getClass().getSimpleName());
        this.state = State.STOPPED;
        FutureUtils.forward(FutureUtils.composeAfterwards(closeDispatcherService(), this::onClose), this.terminationFuture);
    }

    private CompletableFuture<Void> closeDispatcherService() {
        return this.dispatcherService != null ? this.dispatcherService.closeAsync() : FutureUtils.completedVoidFuture();
    }

    protected abstract void onStart();

    protected CompletableFuture<Void> onClose() {
        return FutureUtils.completedVoidFuture();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void completeDispatcherSetup(DispatcherGatewayService dispatcherGatewayService) {
        runIfStateIs(State.RUNNING, () -> {
            completeDispatcherSetupInternal(dispatcherGatewayService);
        });
    }

    private void completeDispatcherSetupInternal(DispatcherGatewayService dispatcherGatewayService) {
        Preconditions.checkState(this.dispatcherService == null, "The DispatcherGatewayService can only be set once.");
        this.dispatcherService = dispatcherGatewayService;
        this.dispatcherGatewayFuture.complete(dispatcherGatewayService.getGateway());
        FutureUtils.forward(dispatcherGatewayService.getShutDownFuture(), this.shutDownFuture);
        handleUnexpectedDispatcherServiceTermination(dispatcherGatewayService);
    }

    private void handleUnexpectedDispatcherServiceTermination(DispatcherGatewayService dispatcherGatewayService) {
        dispatcherGatewayService.getTerminationFuture().whenComplete((r6, th) -> {
            runIfStateIs(State.RUNNING, () -> {
                handleError(new FlinkException("Unexpected termination of DispatcherService.", th));
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final <V> Optional<V> supplyUnsynchronizedIfRunning(Supplier<V> supplier) {
        synchronized (this.lock) {
            if (this.state == State.RUNNING) {
                return Optional.of(supplier.get());
            }
            return Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final <V> Optional<V> supplyIfRunning(Supplier<V> supplier) {
        synchronized (this.lock) {
            if (this.state != State.RUNNING) {
                return Optional.empty();
            }
            return Optional.of(supplier.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void runIfStateIs(State state, Runnable runnable) {
        state.getClass();
        runIfState((v1) -> {
            return r1.equals(v1);
        }, runnable);
    }

    private void runIfStateIsNot(State state, Runnable runnable) {
        runIfState(state2 -> {
            return !state.equals(state2);
        }, runnable);
    }

    private void runIfState(Predicate<State> predicate, Runnable runnable) {
        synchronized (this.lock) {
            if (predicate.test(this.state)) {
                runnable.run();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final <T> Void onErrorIfRunning(T t, Throwable th) {
        synchronized (this.lock) {
            if (this.state != State.RUNNING) {
                return null;
            }
            if (th == null) {
                return null;
            }
            handleError(th);
            return null;
        }
    }

    private void handleError(Throwable th) {
        closeAsync();
        this.fatalErrorHandler.onFatalError(th);
    }
}
