/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.node;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.framework.cluster.zookeeper.ZooKeeperClientConfig;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorNodeProtocolSender
extends AbstractNodeProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(CuratorNodeProtocolSender.class);
    private final String coordinatorPath;
    private final ZooKeeperClientConfig zkConfig;
    private InetSocketAddress coordinatorAddress;

    public CuratorNodeProtocolSender(SocketConfiguration socketConfig, ProtocolContext<ProtocolMessage> protocolContext, NiFiProperties nifiProperties) {
        super(socketConfig, protocolContext);
        this.zkConfig = ZooKeeperClientConfig.createConfig((NiFiProperties)nifiProperties);
        this.coordinatorPath = this.zkConfig.resolvePath("cluster/nodes/coordinator");
    }

    protected synchronized InetSocketAddress getServiceAddress() throws IOException {
        if (this.coordinatorAddress != null) {
            return this.coordinatorAddress;
        }
        RetryNTimes retryPolicy = new RetryNTimes(0, 0);
        curatorClient.start();
        try (CuratorFramework curatorClient = CuratorFrameworkFactory.newClient((String)this.zkConfig.getConnectString(), (int)this.zkConfig.getSessionTimeoutMillis(), (int)this.zkConfig.getConnectionTimeoutMillis(), (RetryPolicy)retryPolicy);){
            InetSocketAddress socketAddress;
            int port;
            byte[] coordinatorAddressBytes = (byte[])((BackgroundPathable)curatorClient.getData().usingWatcher(new Watcher(){

                public void process(WatchedEvent event) {
                    CuratorNodeProtocolSender.this.coordinatorAddress = null;
                }
            })).forPath(this.coordinatorPath);
            if (coordinatorAddressBytes == null || coordinatorAddressBytes.length == 0) {
                throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
            }
            String address = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
            String[] splits = address.split(":");
            if (splits.length != 2) {
                String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates that address is %s, but this is not in the expected format of <hostname>:<port>", address);
                logger.error(message);
                throw new ProtocolException(message);
            }
            logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", (Object)address);
            String hostname = splits[0];
            try {
                port = Integer.parseInt(splits[1]);
                if (port < 1 || port > 65535) {
                    throw new NumberFormatException("Port must be in the range of 1 - 65535 but got " + port);
                }
            }
            catch (NumberFormatException nfe) {
                String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates that address is %s, but the port is not a valid port number", address);
                logger.error(message);
                throw new ProtocolException(message);
            }
            this.coordinatorAddress = socketAddress = InetSocketAddress.createUnresolved(hostname, port);
            InetSocketAddress inetSocketAddress = socketAddress;
            return inetSocketAddress;
        }
    }
}

