package org.apache.nifi.cluster.protocol;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;

/* loaded from: input_file:org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.class */
public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
    private final SocketConfiguration socketConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;

    public AbstractNodeProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
    }

    @Override // org.apache.nifi.cluster.protocol.NodeProtocolSender
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage connectionRequestMessage) throws ProtocolException, UnknownServiceAddressException {
        Socket socket = null;
        try {
            socket = createSocket();
            try {
                this.protocolContext.createMarshaller().marshal(connectionRequestMessage, socket.getOutputStream());
                try {
                    ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                    if (ProtocolMessage.MessageType.CONNECTION_RESPONSE != unmarshal.getType()) {
                        throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' but found '" + unmarshal.getType() + "'");
                    }
                    ConnectionResponseMessage connectionResponseMessage = (ConnectionResponseMessage) unmarshal;
                    SocketUtils.closeQuietly(socket);
                    return connectionResponseMessage;
                } catch (IOException e) {
                    throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + e, e);
                }
            } catch (IOException e2) {
                throw new ProtocolException("Failed marshalling '" + connectionRequestMessage.getType() + "' protocol message due to: " + e2, e2);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.protocol.NodeProtocolSender
    public HeartbeatResponseMessage heartbeat(HeartbeatMessage heartbeatMessage, String str) throws ProtocolException {
        try {
            String[] split = str.split(":");
            ProtocolMessage sendProtocolMessage = sendProtocolMessage(heartbeatMessage, split[0], Integer.parseInt(split[1]));
            if (ProtocolMessage.MessageType.HEARTBEAT_RESPONSE == sendProtocolMessage.getType()) {
                return (HeartbeatResponseMessage) sendProtocolMessage;
            }
            throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.HEARTBEAT_RESPONSE + "' but found '" + sendProtocolMessage.getType() + "'");
        } catch (Exception e) {
            throw new IllegalArgumentException("Cannot send heartbeat to address [" + str + "]. Address must be in <hostname>:<port> format");
        }
    }

    private Socket createSocket() {
        InetSocketAddress inetSocketAddress = null;
        try {
            inetSocketAddress = getServiceAddress();
            return SocketUtils.createSocket(inetSocketAddress, this.socketConfiguration);
        } catch (IOException e) {
            if (inetSocketAddress == null) {
                throw new ProtocolException("Failed to create socket due to: " + e, e);
            }
            throw new ProtocolException("Failed to create socket to " + inetSocketAddress + " due to: " + e, e);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    private ProtocolMessage sendProtocolMessage(ProtocolMessage protocolMessage, String str, int i) {
        Socket socket = null;
        try {
            try {
                socket = SocketUtils.createSocket(new InetSocketAddress(str, i), this.socketConfiguration);
                try {
                    this.protocolContext.createMarshaller().marshal(protocolMessage, socket.getOutputStream());
                    try {
                        ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                        SocketUtils.closeQuietly(socket);
                        return unmarshal;
                    } catch (IOException e) {
                        throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + e, e);
                    }
                } catch (IOException e2) {
                    throw new ProtocolException("Failed marshalling '" + protocolMessage.getType() + "' protocol message due to: " + e2, e2);
                }
            } catch (IOException e3) {
                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e3, e3);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    protected abstract InetSocketAddress getServiceAddress() throws IOException;
}
