/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.cluster;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterProtocolHeartbeater
implements Heartbeater {
    private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
    private final NodeProtocolSender protocolSender;
    private final CuratorFramework curatorClient;
    private final String nodesPathPrefix;
    private final String coordinatorPath;
    private volatile String coordinatorAddress;

    public ClusterProtocolHeartbeater(NodeProtocolSender protocolSender, Properties properties) {
        this.protocolSender = protocolSender;
        RetryNTimes retryPolicy = new RetryNTimes(10, 500);
        ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
        this.curatorClient = CuratorFrameworkFactory.newClient((String)zkConfig.getConnectString(), (int)zkConfig.getSessionTimeoutMillis(), (int)zkConfig.getConnectionTimeoutMillis(), (RetryPolicy)retryPolicy);
        this.curatorClient.start();
        this.nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
        this.coordinatorPath = this.nodesPathPrefix + "/coordinator";
    }

    @Override
    public String getHeartbeatAddress() throws IOException {
        String curAddress = this.coordinatorAddress;
        if (curAddress != null) {
            return curAddress;
        }
        try {
            byte[] coordinatorAddressBytes = (byte[])((BackgroundPathable)this.curatorClient.getData().usingWatcher(new Watcher(){

                public void process(WatchedEvent event) {
                    ClusterProtocolHeartbeater.this.coordinatorAddress = null;
                }
            })).forPath(this.coordinatorPath);
            String address = this.coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
            logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", (Object)address);
            return address;
        }
        catch (Exception e) {
            throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
        }
    }

    @Override
    public synchronized void send(HeartbeatMessage heartbeatMessage) throws IOException {
        String heartbeatAddress = this.getHeartbeatAddress();
        try {
            this.protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
        }
        catch (ProtocolException pe) {
            if (pe.getCause() instanceof IOException) {
                this.coordinatorAddress = null;
            }
            throw pe;
        }
    }

    @Override
    public void close() throws IOException {
        if (this.curatorClient != null) {
            this.curatorClient.close();
        }
        logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper");
    }
}

