package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.class */
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalService.class);
    private final CuratorFramework client;
    private final NodeCache cache;
    private final Object lock = new Object();
    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { // from class: org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.1
        @Override // org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateListener
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            ZooKeeperLeaderRetrievalService.this.handleStateChange(connectionState);
        }
    };
    private volatile LeaderRetrievalListener leaderListener = null;
    private String lastLeaderAddress = null;
    private UUID lastLeaderSessionID = null;
    private volatile boolean running = false;

    public ZooKeeperLeaderRetrievalService(CuratorFramework curatorFramework, String str) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "CuratorFramework client");
        this.cache = new NodeCache(curatorFramework, str);
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
    public void start(LeaderRetrievalListener leaderRetrievalListener) throws Exception {
        Preconditions.checkNotNull(leaderRetrievalListener, "Listener must not be null.");
        Preconditions.checkState(this.leaderListener == null, "ZooKeeperLeaderRetrievalService can only be started once.");
        LOG.info("Starting ZooKeeperLeaderRetrievalService.");
        synchronized (this.lock) {
            this.leaderListener = leaderRetrievalListener;
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.client.getConnectionStateListenable().addListener(this.connectionStateListener);
            this.running = true;
        }
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
    public void stop() throws Exception {
        LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
        synchronized (this.lock) {
            if (this.running) {
                this.running = false;
                this.client.getConnectionStateListenable().removeListener(this.connectionStateListener);
                try {
                    this.cache.close();
                } catch (IOException e) {
                    throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
                }
            }
        }
    }

    @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener
    public void nodeChanged() throws Exception {
        String str;
        UUID uuid;
        synchronized (this.lock) {
            if (this.running) {
                try {
                    LOG.debug("Leader node has changed.");
                    ChildData currentData = this.cache.getCurrentData();
                    if (currentData == null) {
                        str = null;
                        uuid = null;
                    } else {
                        byte[] data = currentData.getData();
                        if (data == null || data.length == 0) {
                            str = null;
                            uuid = null;
                        } else {
                            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
                            str = objectInputStream.readUTF();
                            uuid = (UUID) objectInputStream.readObject();
                        }
                    }
                    if (!Objects.equals(str, this.lastLeaderAddress) || !Objects.equals(uuid, this.lastLeaderSessionID)) {
                        LOG.debug("New leader information: Leader={}, session ID={}.", str, uuid);
                        this.lastLeaderAddress = str;
                        this.lastLeaderSessionID = uuid;
                        this.leaderListener.notifyLeaderAddress(str, uuid);
                    }
                } catch (Exception e) {
                    this.leaderListener.handleError(new Exception("Could not handle node changed event.", e));
                    throw e;
                }
            } else {
                LOG.debug("Ignoring node change notification since the service has already been stopped.");
            }
        }
    }

    protected void handleStateChange(ConnectionState connectionState) {
        switch (connectionState) {
            case CONNECTED:
                LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
                return;
            case SUSPENDED:
                LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.");
                return;
            case RECONNECTED:
                LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
                return;
            case LOST:
                LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from ZooKeeper.");
                return;
            default:
                return;
        }
    }
}
