/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNodeProtocolSender
implements NodeProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
    private final SocketConfiguration socketConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;

    public AbstractNodeProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
    }

    @Override
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg, boolean allowConnectToSelf) throws ProtocolException, UnknownServiceAddressException {
        ProtocolMessage response;
        block10: {
            ConnectionResponseMessage connectionResponseMessage;
            Socket socket = null;
            try {
                ConnectionResponseMessage connectionResponse;
                InetSocketAddress socketAddress;
                try {
                    socketAddress = this.getServiceAddress();
                }
                catch (IOException e) {
                    throw new ProtocolException("Could not determined address of Cluster Coordinator", e);
                }
                if (!allowConnectToSelf) {
                    this.validateNotConnectingToSelf(msg, socketAddress);
                }
                logger.info("Cluster Coordinator is located at {}. Will send Cluster Connection Request to this address", (Object)socketAddress);
                socket = this.createSocket(socketAddress);
                try {
                    ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                    marshaller.marshal(msg, socket.getOutputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
                }
                try {
                    ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                    response = unmarshaller.unmarshal(socket.getInputStream());
                }
                catch (IOException ioe) {
                    throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe);
                }
                if (ProtocolMessage.MessageType.CONNECTION_RESPONSE != response.getType()) break block10;
                connectionResponseMessage = connectionResponse = (ConnectionResponseMessage)response;
            }
            catch (Throwable throwable) {
                SocketUtils.closeQuietly(socket);
                throw throwable;
            }
            SocketUtils.closeQuietly((Socket)socket);
            return connectionResponseMessage;
        }
        throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' but found '" + (Object)((Object)response.getType()) + "'");
    }

    private void validateNotConnectingToSelf(ConnectionRequestMessage msg, InetSocketAddress socketAddress) {
        NodeIdentifier localNodeIdentifier = msg.getConnectionRequest().getProposedNodeIdentifier();
        if (localNodeIdentifier == null) {
            return;
        }
        String localAddress = localNodeIdentifier.getSocketAddress();
        int localPort = localNodeIdentifier.getSocketPort();
        if (Objects.equals(localAddress, socketAddress.getHostString()) && localPort == socketAddress.getPort()) {
            throw new UnknownServiceAddressException("Cluster Coordinator is currently " + socketAddress.getHostString() + ":" + socketAddress.getPort() + ", which is this node, but connecting to self is not allowed at this phase of the lifecycle. This node must wait for a new Cluster Coordinator to be elected before connecting to the cluster.");
        }
    }

    @Override
    public HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
        int port;
        String hostname;
        try {
            String[] parts = address.split(":");
            hostname = parts[0];
            port = Integer.parseInt(parts[1]);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
        }
        ProtocolMessage responseMessage = this.sendProtocolMessage(msg, hostname, port);
        if (ProtocolMessage.MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) {
            return (HeartbeatResponseMessage)responseMessage;
        }
        throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.HEARTBEAT_RESPONSE) + "' but found '" + (Object)((Object)responseMessage.getType()) + "'");
    }

    @Override
    public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
        InetSocketAddress serviceAddress;
        try {
            serviceAddress = this.getServiceAddress();
        }
        catch (IOException e) {
            throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
        }
        ProtocolMessage responseMessage = this.sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort());
        if (ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
            return (ClusterWorkloadResponseMessage)responseMessage;
        }
        throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE) + "' but found '" + (Object)((Object)responseMessage.getType()) + "'");
    }

    private Socket createSocket(InetSocketAddress socketAddress) {
        try {
            return SocketUtils.createSocket((InetSocketAddress)socketAddress, (SocketConfiguration)this.socketConfiguration);
        }
        catch (IOException ioe) {
            if (socketAddress == null) {
                throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
            }
            throw new ProtocolException("Failed to create socket to " + socketAddress + " due to: " + ioe, ioe);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ProtocolMessage sendProtocolMessage(ProtocolMessage msg, String hostname, int port) {
        ProtocolMessage protocolMessage;
        Socket socket = null;
        try {
            ProtocolMessage response;
            try {
                socket = SocketUtils.createSocket((InetSocketAddress)new InetSocketAddress(hostname, port), (SocketConfiguration)this.socketConfiguration);
            }
            catch (IOException e) {
                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e, e);
            }
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
            try {
                ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                response = unmarshaller.unmarshal(socket.getInputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe);
            }
            protocolMessage = response;
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly((Socket)socket);
        return protocolMessage;
    }

    protected abstract InetSocketAddress getServiceAddress() throws IOException;
}

