/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderElectionService {
    private static final Logger log = LoggerFactory.getLogger(LeaderElectionService.class);
    private static final String ELECTION_ROOT = "/loadbalance/leader";
    private final PulsarService pulsar;
    private final ExecutorService executor;
    private boolean stopped = true;
    private final ZooKeeper zkClient;
    private final AtomicReference<LeaderBroker> currentLeader = new AtomicReference();
    private final AtomicBoolean isLeader = new AtomicBoolean();
    private final ObjectMapper jsonMapper;
    private final LeaderListener leaderListener;

    public LeaderElectionService(PulsarService pulsar, LeaderListener leaderListener) {
        this.pulsar = pulsar;
        this.zkClient = pulsar.getZkClient();
        this.executor = pulsar.getExecutor();
        this.leaderListener = leaderListener;
        this.jsonMapper = new ObjectMapper();
    }

    private void elect() {
        try {
            byte[] data = this.zkClient.getData(ELECTION_ROOT, new Watcher(){

                public void process(WatchedEvent event) {
                    log.warn("Type of the event is [{}] and path is [{}]", (Object)event.getType(), (Object)event.getPath());
                    switch (event.getType()) {
                        case NodeDeleted: {
                            log.warn("Election node {} is deleted, attempting re-election...", (Object)event.getPath());
                            if (!event.getPath().equals(LeaderElectionService.ELECTION_ROOT)) break;
                            log.info("This should call elect again...");
                            LeaderElectionService.this.executor.execute(new Runnable(){

                                @Override
                                public void run() {
                                    log.info("Broker [{}] is calling re-election from the thread", (Object)LeaderElectionService.this.pulsar.getSafeWebServiceAddress());
                                    LeaderElectionService.this.elect();
                                }
                            });
                            break;
                        }
                        default: {
                            log.warn("Got something wrong on watch: {}", (Object)event);
                        }
                    }
                }
            }, null);
            LeaderBroker leaderBroker = (LeaderBroker)this.jsonMapper.readValue(data, LeaderBroker.class);
            this.currentLeader.set(leaderBroker);
            this.isLeader.set(false);
            this.leaderListener.brokerIsAFollowerNow();
            log.info("Broker [{}] is the follower now. Waiting for the watch to trigger...", (Object)this.pulsar.getSafeWebServiceAddress());
        }
        catch (KeeperException.NoNodeException nne) {
            try {
                LeaderBroker leaderBroker = new LeaderBroker(this.pulsar.getSafeWebServiceAddress());
                ZkUtils.createFullPathOptimistic((ZooKeeper)this.pulsar.getLocalZkCache().getZooKeeper(), (String)ELECTION_ROOT, (byte[])this.jsonMapper.writeValueAsBytes((Object)leaderBroker), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.EPHEMERAL);
                this.currentLeader.set(new LeaderBroker(leaderBroker.getServiceUrl()));
                this.isLeader.set(true);
                log.info("Broker [{}] is the leader now, notifying the listener...", (Object)this.pulsar.getSafeWebServiceAddress());
                this.leaderListener.brokerIsTheLeaderNow();
            }
            catch (KeeperException.NodeExistsException nee) {
                log.warn("Got exception [{}] while creating election node because it already exists. Attempting re-election...", (Object)nee.getMessage());
                this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        LeaderElectionService.this.elect();
                    }
                });
            }
            catch (Exception e) {
                log.error("Got exception [{}] while creating the election node", (Object)e.getMessage());
                this.pulsar.getShutdownService().shutdown(-1);
            }
        }
        catch (Exception e) {
            log.error("Could not get the content of [{}], got exception [{}]. Shutting down the broker...", (Object)ELECTION_ROOT, (Object)e);
            this.pulsar.getShutdownService().shutdown(-1);
        }
    }

    public void start() {
        Preconditions.checkState((boolean)this.stopped);
        this.stopped = false;
        log.info("LeaderElectionService started");
        this.elect();
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        if (this.isLeader()) {
            try {
                this.pulsar.getLocalZkCache().getZooKeeper().delete(ELECTION_ROOT, -1);
            }
            catch (Throwable t) {
                log.warn("Failed to cleanup election root znode: {}", t);
            }
        }
        this.stopped = true;
        log.info("LeaderElectionService stopped");
    }

    public LeaderBroker getCurrentLeader() {
        return this.currentLeader.get();
    }

    public boolean isLeader() {
        return this.isLeader.get();
    }

    public static interface LeaderListener {
        public void brokerIsTheLeaderNow();

        public void brokerIsAFollowerNow();
    }
}

