/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.highavailability.nonha.embedded;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
    private final Object lock = new Object();
    private final Executor notificationExecutor;
    private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
    private final Set<EmbeddedLeaderRetrievalService> listeners;
    private EmbeddedLeaderElectionService currentLeaderProposed;
    private EmbeddedLeaderElectionService currentLeaderConfirmed;
    private volatile UUID currentLeaderSessionId;
    private String currentLeaderAddress;
    private boolean shutdown;

    public EmbeddedLeaderService(Executor notificationsDispatcher) {
        this.notificationExecutor = (Executor)Preconditions.checkNotNull((Object)notificationsDispatcher);
        this.allLeaderContenders = new HashSet<EmbeddedLeaderElectionService>();
        this.listeners = new HashSet<EmbeddedLeaderRetrievalService>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            this.shutdownInternally(new Exception("Leader election service is shutting down"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.shutdown;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fatalError(Throwable error) {
        LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
        Object object = this.lock;
        synchronized (object) {
            this.shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
        }
    }

    @GuardedBy(value="lock")
    private void shutdownInternally(Exception exceptionForHandlers) {
        assert (Thread.holdsLock(this.lock));
        if (!this.shutdown) {
            this.currentLeaderProposed = null;
            this.currentLeaderConfirmed = null;
            this.currentLeaderSessionId = null;
            this.currentLeaderAddress = null;
            for (EmbeddedLeaderElectionService embeddedLeaderElectionService : this.allLeaderContenders) {
                embeddedLeaderElectionService.shutdown(exceptionForHandlers);
            }
            this.allLeaderContenders.clear();
            for (EmbeddedLeaderRetrievalService embeddedLeaderRetrievalService : this.listeners) {
                embeddedLeaderRetrievalService.shutdown(exceptionForHandlers);
            }
            this.listeners.clear();
            this.shutdown = true;
        }
    }

    public LeaderElectionService createLeaderElectionService() {
        Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"leader election service is shut down");
        return new EmbeddedLeaderElectionService();
    }

    public LeaderRetrievalService createLeaderRetrievalService() {
        Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"leader election service is shut down");
        return new EmbeddedLeaderRetrievalService();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"leader election service is shut down");
            Preconditions.checkState((!service.running ? 1 : 0) != 0, (Object)"leader election service is already started");
            try {
                if (!this.allLeaderContenders.add(service)) {
                    throw new IllegalStateException("leader election service was added to this service multiple times");
                }
                service.contender = contender;
                service.running = true;
                this.updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        this.fatalError((Throwable)throwable);
                    }
                });
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeContender(EmbeddedLeaderElectionService service) {
        Object object = this.lock;
        synchronized (object) {
            if (!service.running || this.shutdown) {
                return;
            }
            try {
                if (!this.allLeaderContenders.remove(service)) {
                    throw new IllegalStateException("leader election service does not belong to this service");
                }
                service.contender = null;
                service.running = false;
                service.isLeader = false;
                if (this.currentLeaderConfirmed == service) {
                    this.currentLeaderConfirmed = null;
                    this.currentLeaderSessionId = null;
                    this.currentLeaderAddress = null;
                }
                if (this.currentLeaderProposed == service) {
                    this.currentLeaderProposed = null;
                    this.currentLeaderSessionId = null;
                }
                this.updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        this.fatalError((Throwable)throwable);
                    }
                });
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void confirmLeader(EmbeddedLeaderElectionService service, UUID leaderSessionId, String leaderAddress) {
        Object object = this.lock;
        synchronized (object) {
            if (!service.running || this.shutdown) {
                return;
            }
            try {
                if (service == this.currentLeaderProposed && this.currentLeaderSessionId.equals(leaderSessionId)) {
                    LOG.info("Received confirmation of leadership for leader {} , session={}", (Object)leaderAddress, (Object)leaderSessionId);
                    this.currentLeaderConfirmed = service;
                    this.currentLeaderAddress = leaderAddress;
                    this.currentLeaderProposed = null;
                    this.notifyAllListeners(leaderAddress, leaderSessionId);
                } else {
                    LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
                }
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    private CompletableFuture<Void> notifyAllListeners(String address, UUID leaderSessionId) {
        ArrayList<CompletableFuture<Void>> notifyListenerFutures = new ArrayList<CompletableFuture<Void>>(this.listeners.size());
        for (EmbeddedLeaderRetrievalService listener : this.listeners) {
            notifyListenerFutures.add(this.notifyListener(address, leaderSessionId, listener.listener));
        }
        return FutureUtils.waitForAll(notifyListenerFutures);
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Void> updateLeader() {
        assert (Thread.holdsLock(this.lock));
        if (this.currentLeaderConfirmed == null && this.currentLeaderProposed == null) {
            if (this.allLeaderContenders.isEmpty()) {
                return this.notifyAllListeners(null, null);
            }
            UUID leaderSessionId = UUID.randomUUID();
            EmbeddedLeaderElectionService leaderService = this.allLeaderContenders.iterator().next();
            this.currentLeaderSessionId = leaderSessionId;
            this.currentLeaderProposed = leaderService;
            this.currentLeaderProposed.isLeader = true;
            LOG.info("Proposing leadership to contender {}", (Object)leaderService.contender.getDescription());
            return this.execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> notifyListener(@Nullable String address, @Nullable UUID leaderSessionId, LeaderRetrievalListener listener) {
        return CompletableFuture.runAsync(new NotifyOfLeaderCall(address, leaderSessionId, listener, LOG), this.notificationExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"leader election service is shut down");
            Preconditions.checkState((!service.running ? 1 : 0) != 0, (Object)"leader retrieval service is already started");
            try {
                if (!this.listeners.add(service)) {
                    throw new IllegalStateException("leader retrieval service was added to this service multiple times");
                }
                service.listener = listener;
                service.running = true;
                if (this.currentLeaderConfirmed != null) {
                    this.notifyListener(this.currentLeaderAddress, this.currentLeaderSessionId, listener);
                }
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeListener(EmbeddedLeaderRetrievalService service) {
        Object object = this.lock;
        synchronized (object) {
            if (!service.running || this.shutdown) {
                return;
            }
            try {
                if (!this.listeners.remove(service)) {
                    throw new IllegalStateException("leader retrieval service does not belong to this service");
                }
                service.listener = null;
                service.running = false;
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    CompletableFuture<Void> grantLeadership() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return this.getShutDownFuture();
            }
            return this.updateLeader();
        }
    }

    private CompletableFuture<Void> getShutDownFuture() {
        return FutureUtils.completedExceptionally((Throwable)new FlinkException("EmbeddedLeaderService has been shut down."));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    CompletableFuture<Void> revokeLeadership() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return this.getShutDownFuture();
            }
            if (this.currentLeaderProposed != null || this.currentLeaderConfirmed != null) {
                EmbeddedLeaderElectionService leaderService = this.currentLeaderConfirmed != null ? this.currentLeaderConfirmed : this.currentLeaderProposed;
                LOG.info("Revoking leadership of {}.", (Object)leaderService.contender);
                leaderService.isLeader = false;
                CompletableFuture<Void> revokeLeadershipCallFuture = this.execute(new RevokeLeadershipCall(leaderService.contender));
                CompletableFuture<Void> notifyAllListenersFuture = this.notifyAllListeners(null, null);
                this.currentLeaderProposed = null;
                this.currentLeaderConfirmed = null;
                this.currentLeaderAddress = null;
                this.currentLeaderSessionId = null;
                return CompletableFuture.allOf(revokeLeadershipCallFuture, notifyAllListenersFuture);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private CompletableFuture<Void> execute(Runnable runnable) {
        return CompletableFuture.runAsync(runnable, this.notificationExecutor);
    }

    private static class RevokeLeadershipCall
    implements Runnable {
        @Nonnull
        private final LeaderContender contender;

        RevokeLeadershipCall(@Nonnull LeaderContender contender) {
            this.contender = contender;
        }

        @Override
        public void run() {
            this.contender.revokeLeadership();
        }
    }

    private static class GrantLeadershipCall
    implements Runnable {
        private final LeaderContender contender;
        private final UUID leaderSessionId;
        private final Logger logger;

        GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId, Logger logger) {
            this.contender = (LeaderContender)Preconditions.checkNotNull((Object)contender);
            this.leaderSessionId = (UUID)Preconditions.checkNotNull((Object)leaderSessionId);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
        }

        @Override
        public void run() {
            try {
                this.contender.grantLeadership(this.leaderSessionId);
            }
            catch (Throwable t) {
                this.logger.warn("Error granting leadership to contender", t);
                this.contender.handleError(t instanceof Exception ? (Exception)t : new Exception(t));
            }
        }
    }

    private static class NotifyOfLeaderCall
    implements Runnable {
        @Nullable
        private final String address;
        @Nullable
        private final UUID leaderSessionId;
        private final LeaderRetrievalListener listener;
        private final Logger logger;

        NotifyOfLeaderCall(@Nullable String address, @Nullable UUID leaderSessionId, LeaderRetrievalListener listener, Logger logger) {
            this.address = address;
            this.leaderSessionId = leaderSessionId;
            this.listener = (LeaderRetrievalListener)Preconditions.checkNotNull((Object)listener);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
        }

        @Override
        public void run() {
            try {
                this.listener.notifyLeaderAddress(this.address, this.leaderSessionId);
            }
            catch (Throwable t) {
                this.logger.warn("Error notifying leader listener about new leader", t);
                this.listener.handleError(t instanceof Exception ? (Exception)t : new Exception(t));
            }
        }
    }

    private class EmbeddedLeaderRetrievalService
    implements LeaderRetrievalService {
        volatile LeaderRetrievalListener listener;
        volatile boolean running;

        private EmbeddedLeaderRetrievalService() {
        }

        @Override
        public void start(LeaderRetrievalListener listener) throws Exception {
            Preconditions.checkNotNull((Object)listener);
            EmbeddedLeaderService.this.addListener(this, listener);
        }

        @Override
        public void stop() throws Exception {
            EmbeddedLeaderService.this.removeListener(this);
        }

        public void shutdown(Exception cause) {
            if (this.running) {
                this.running = false;
                this.listener = null;
            }
        }
    }

    private class EmbeddedLeaderElectionService
    implements LeaderElectionService {
        volatile LeaderContender contender;
        volatile boolean isLeader;
        volatile boolean running;

        private EmbeddedLeaderElectionService() {
        }

        @Override
        public void start(LeaderContender contender) throws Exception {
            Preconditions.checkNotNull((Object)contender);
            EmbeddedLeaderService.this.addContender(this, contender);
        }

        @Override
        public void stop() throws Exception {
            EmbeddedLeaderService.this.removeContender(this);
        }

        @Override
        public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
            Preconditions.checkNotNull((Object)leaderSessionID);
            Preconditions.checkNotNull((Object)leaderAddress);
            EmbeddedLeaderService.this.confirmLeader(this, leaderSessionID, leaderAddress);
        }

        @Override
        public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
            return this.isLeader && leaderSessionId.equals(EmbeddedLeaderService.this.currentLeaderSessionId);
        }

        void shutdown(Exception cause) {
            if (this.running) {
                this.running = false;
                this.isLeader = false;
                this.contender.revokeLeadership();
                this.contender = null;
            }
        }
    }
}

