package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.class */
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalService.class);
    private final CuratorFramework client;
    private final NodeCache cache;
    private volatile LeaderRetrievalListener leaderListener;
    private String lastLeaderAddress;
    private UUID lastLeaderSessionID;

    public ZooKeeperLeaderRetrievalService(CuratorFramework curatorFramework, String str) {
        this.client = curatorFramework;
        this.cache = new NodeCache(curatorFramework, str);
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
    public void start(LeaderRetrievalListener leaderRetrievalListener) throws Exception {
        Preconditions.checkNotNull(leaderRetrievalListener, "Listener must not be null.");
        Preconditions.checkState(this.leaderListener == null, "ZooKeeperLeaderRetrievalService can only be started once.");
        LOG.info("Starting ZooKeeperLeaderRetrievalService.");
        this.leaderListener = leaderRetrievalListener;
        this.cache.getListenable().addListener(this);
        this.cache.start();
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
    public void stop() throws Exception {
        LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
        this.cache.close();
        this.client.close();
    }

    @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener
    public void nodeChanged() throws Exception {
        String str;
        UUID uuid;
        try {
            LOG.debug("Leader node has changed.");
            ChildData currentData = this.cache.getCurrentData();
            if (currentData == null) {
                str = null;
                uuid = null;
            } else {
                byte[] data = currentData.getData();
                if (data == null || data.length == 0) {
                    str = null;
                    uuid = null;
                } else {
                    ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
                    str = objectInputStream.readUTF();
                    uuid = (UUID) objectInputStream.readObject();
                }
            }
            if (!Objects.equals(str, this.lastLeaderAddress) || !Objects.equals(uuid, this.lastLeaderSessionID)) {
                LOG.debug("New leader information: Leader={}, session ID={}.", str, uuid);
                this.lastLeaderAddress = str;
                this.lastLeaderSessionID = uuid;
                this.leaderListener.notifyLeaderAddress(str, uuid);
            }
        } catch (Exception e) {
            this.leaderListener.handleError(new Exception("Could not handle node changed event.", e));
            throw e;
        }
    }
}
