package org.apache.nifi.cluster.protocol.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.FormatUtils;

/* loaded from: input_file:org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.class */
public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender {
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final SocketConfiguration socketConfiguration;
    private int handshakeTimeoutSeconds;
    private volatile BulletinRepository bulletinRepository;

    public ClusterManagerProtocolSenderImpl(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        if (socketConfiguration == null) {
            throw new IllegalArgumentException("Socket configuration may not be null.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.handshakeTimeoutSeconds = -1;
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
        this.bulletinRepository = bulletinRepository;
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender
    public FlowResponseMessage requestFlow(FlowRequestMessage flowRequestMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(flowRequestMessage.getNodeId(), false);
            try {
                this.protocolContext.createMarshaller().marshal(flowRequestMessage, socket.getOutputStream());
                try {
                    ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                    if (ProtocolMessage.MessageType.FLOW_RESPONSE != unmarshal.getType()) {
                        throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.FLOW_RESPONSE + "' but found '" + unmarshal.getType() + "'");
                    }
                    FlowResponseMessage flowResponseMessage = (FlowResponseMessage) unmarshal;
                    SocketUtils.closeQuietly(socket);
                    return flowResponseMessage;
                } catch (IOException e) {
                    throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.FLOW_RESPONSE + "' protocol message due to: " + e, e);
                }
            } catch (IOException e2) {
                throw new ProtocolException("Failed marshalling '" + flowRequestMessage.getType() + "' protocol message due to: " + e2, e2);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender
    public ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage reconnectionRequestMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(reconnectionRequestMessage.getNodeId(), true);
            try {
                this.protocolContext.createMarshaller().marshal(reconnectionRequestMessage, socket.getOutputStream());
                try {
                    ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                    if (ProtocolMessage.MessageType.RECONNECTION_RESPONSE != unmarshal.getType()) {
                        throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.FLOW_RESPONSE + "' but found '" + unmarshal.getType() + "'");
                    }
                    ReconnectionResponseMessage reconnectionResponseMessage = (ReconnectionResponseMessage) unmarshal;
                    SocketUtils.closeQuietly(socket);
                    return reconnectionResponseMessage;
                } catch (IOException e) {
                    throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + e, e);
                }
            } catch (IOException e2) {
                throw new ProtocolException("Failed marshalling '" + reconnectionRequestMessage.getType() + "' protocol message due to: " + e2, e2);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender
    public void disconnect(DisconnectMessage disconnectMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(disconnectMessage.getNodeId(), true);
            try {
                this.protocolContext.createMarshaller().marshal(disconnectMessage, socket.getOutputStream());
                SocketUtils.closeQuietly(socket);
            } catch (IOException e) {
                throw new ProtocolException("Failed marshalling '" + disconnectMessage.getType() + "' protocol message due to: " + e, e);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender
    public void assignPrimaryRole(PrimaryRoleAssignmentMessage primaryRoleAssignmentMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(primaryRoleAssignmentMessage.getNodeId(), true);
            try {
                this.protocolContext.createMarshaller().marshal(primaryRoleAssignmentMessage, socket.getOutputStream());
                SocketUtils.closeQuietly(socket);
            } catch (IOException e) {
                throw new ProtocolException("Failed marshalling '" + primaryRoleAssignmentMessage.getType() + "' protocol message due to: " + e, e);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    private void setConnectionHandshakeTimeoutOnSocket(Socket socket) throws SocketException {
        if (this.handshakeTimeoutSeconds >= 0) {
            socket.setSoTimeout(this.handshakeTimeoutSeconds * 1000);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    public int getHandshakeTimeoutSeconds() {
        return this.handshakeTimeoutSeconds;
    }

    public void setHandshakeTimeout(String str) {
        this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(str, TimeUnit.SECONDS);
    }

    private Socket createSocket(NodeIdentifier nodeIdentifier, boolean z) {
        return createSocket(nodeIdentifier.getSocketAddress(), nodeIdentifier.getSocketPort(), z);
    }

    private Socket createSocket(String str, int i, boolean z) {
        try {
            Socket createSocket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(str, i), this.socketConfiguration);
            if (z) {
                setConnectionHandshakeTimeoutOnSocket(createSocket);
            }
            return createSocket;
        } catch (IOException e) {
            throw new ProtocolException("Failed to create socket due to: " + e, e);
        }
    }
}
