package org.apache.samza.zk;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkLeaderElector.class */
public class ZkLeaderElector implements LeaderElector {
    public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
    private final ZkUtils zkUtils;
    private final String processorIdStr;
    private final ZkKeyBuilder keyBuilder;
    private final String hostName;
    private AtomicBoolean isLeader;
    private final IZkDataListener previousProcessorChangeListener;
    private LeaderElectorListener leaderElectorListener;
    private String currentSubscription;
    private final Random random;

    /* loaded from: input_file:org/apache/samza/zk/ZkLeaderElector$PreviousProcessorChangeListener.class */
    class PreviousProcessorChangeListener implements IZkDataListener {
        PreviousProcessorChangeListener() {
        }

        public void handleDataChange(String str, Object obj) throws Exception {
            ZkLeaderElector.LOG.debug("Data change on path: " + str + " Data: " + obj);
        }

        public void handleDataDeleted(String str) throws Exception {
            ZkLeaderElector.LOG.info(ZkLeaderElector.this.zLog("Data deleted on path " + str + ". Predecessor went away. So, trying to become leader again..."));
            ZkLeaderElector.this.tryBecomeLeader();
        }
    }

    public ZkLeaderElector(String str, ZkUtils zkUtils) {
        this.isLeader = new AtomicBoolean(false);
        this.leaderElectorListener = null;
        this.currentSubscription = null;
        this.random = new Random();
        this.processorIdStr = str;
        this.zkUtils = zkUtils;
        this.keyBuilder = zkUtils.getKeyBuilder();
        this.hostName = getHostName();
        this.previousProcessorChangeListener = new PreviousProcessorChangeListener();
        zkUtils.makeSurePersistentPathsExists(new String[]{this.keyBuilder.getProcessorsPath()});
    }

    @VisibleForTesting
    public ZkLeaderElector(String str, ZkUtils zkUtils, IZkDataListener iZkDataListener) {
        this.isLeader = new AtomicBoolean(false);
        this.leaderElectorListener = null;
        this.currentSubscription = null;
        this.random = new Random();
        this.processorIdStr = str;
        this.zkUtils = zkUtils;
        this.keyBuilder = zkUtils.getKeyBuilder();
        this.hostName = getHostName();
        this.previousProcessorChangeListener = iZkDataListener;
    }

    private String getHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            LOG.error("Failed to fetch hostname of the processor", e);
            throw new SamzaException(e);
        }
    }

    @Override // org.apache.samza.coordinator.LeaderElector
    public void setLeaderElectorListener(LeaderElectorListener leaderElectorListener) {
        this.leaderElectorListener = leaderElectorListener;
    }

    @Override // org.apache.samza.coordinator.LeaderElector
    public void tryBecomeLeader() {
        String registerProcessorAndGetId = this.zkUtils.registerProcessorAndGetId(new ProcessorData(this.hostName, this.processorIdStr));
        List<String> sortedActiveProcessorsZnodes = this.zkUtils.getSortedActiveProcessorsZnodes();
        LOG.debug(zLog("Current active processors - " + sortedActiveProcessorsZnodes));
        int indexOf = sortedActiveProcessorsZnodes.indexOf(ZkKeyBuilder.parseIdFromPath(registerProcessorAndGetId));
        LOG.info("tryBecomeLeader: index = " + indexOf + " for path=" + registerProcessorAndGetId + " out of " + Arrays.toString(sortedActiveProcessorsZnodes.toArray()));
        if (sortedActiveProcessorsZnodes.size() == 0 || indexOf == -1) {
            throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
        }
        if (indexOf == 0) {
            this.isLeader.getAndSet(true);
            LOG.info(zLog("Eligible to become the leader!"));
            if (this.leaderElectorListener != null) {
                this.leaderElectorListener.onBecomingLeader();
                return;
            }
            return;
        }
        this.isLeader.getAndSet(false);
        LOG.info("Index = " + indexOf + " Not eligible to be a leader yet!");
        String str = sortedActiveProcessorsZnodes.get(indexOf - 1);
        if (!str.equals(this.currentSubscription)) {
            if (this.currentSubscription != null) {
                LOG.debug(zLog("Unsubscribing data change for " + this.currentSubscription));
                this.zkUtils.unsubscribeDataChanges(this.keyBuilder.getProcessorsPath() + "/" + this.currentSubscription, this.previousProcessorChangeListener);
            }
            this.currentSubscription = str;
            LOG.info(zLog("Subscribing data change for " + str));
            this.zkUtils.subscribeDataChanges(this.keyBuilder.getProcessorsPath() + "/" + this.currentSubscription, this.previousProcessorChangeListener);
        }
        if (this.zkUtils.exists(this.keyBuilder.getProcessorsPath() + "/" + this.currentSubscription)) {
            LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
            return;
        }
        try {
            Thread.sleep(this.random.nextInt(1000));
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
        LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
        tryBecomeLeader();
    }

    @Override // org.apache.samza.coordinator.LeaderElector
    public void resignLeadership() {
        this.isLeader.compareAndSet(true, false);
    }

    @Override // org.apache.samza.coordinator.LeaderElector
    public boolean amILeader() {
        return this.isLeader.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String zLog(String str) {
        return String.format("[Processor-%s] %s", this.processorIdStr, str);
    }
}
