package org.apache.nifi.cluster.protocol.impl;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;

/* loaded from: input_file:org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.class */
public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender {
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final SocketConfiguration socketConfiguration;
    private int handshakeTimeoutSeconds;

    public StandardClusterCoordinationProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        if (socketConfiguration == null) {
            throw new IllegalArgumentException("Socket configuration may not be null.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.handshakeTimeoutSeconds = -1;
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender, org.apache.nifi.cluster.protocol.ProtocolListener
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender
    public ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage reconnectionRequestMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(reconnectionRequestMessage.getNodeId(), true);
            try {
                this.protocolContext.createMarshaller().marshal(reconnectionRequestMessage, socket.getOutputStream());
                try {
                    ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                    if (ProtocolMessage.MessageType.RECONNECTION_RESPONSE != unmarshal.getType()) {
                        throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.FLOW_RESPONSE + "' but found '" + unmarshal.getType() + "'");
                    }
                    ReconnectionResponseMessage reconnectionResponseMessage = (ReconnectionResponseMessage) unmarshal;
                    SocketUtils.closeQuietly(socket);
                    return reconnectionResponseMessage;
                } catch (IOException e) {
                    throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + e, e);
                }
            } catch (IOException e2) {
                throw new ProtocolException("Failed marshalling '" + reconnectionRequestMessage.getType() + "' protocol message due to: " + e2, e2);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender
    public void disconnect(DisconnectMessage disconnectMessage) throws ProtocolException {
        Socket socket = null;
        try {
            socket = createSocket(disconnectMessage.getNodeId(), true);
            try {
                this.protocolContext.createMarshaller().marshal(disconnectMessage, socket.getOutputStream());
                SocketUtils.closeQuietly(socket);
            } catch (IOException e) {
                throw new ProtocolException("Failed marshalling '" + disconnectMessage.getType() + "' protocol message due to: " + e, e);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    private void setConnectionHandshakeTimeoutOnSocket(Socket socket) throws SocketException {
        if (this.handshakeTimeoutSeconds >= 0) {
            socket.setSoTimeout(this.handshakeTimeoutSeconds * 1000);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    public int getHandshakeTimeoutSeconds() {
        return this.handshakeTimeoutSeconds;
    }

    public void setHandshakeTimeout(String str) {
        this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(str, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Socket createSocket(NodeIdentifier nodeIdentifier, boolean z) {
        return createSocket(nodeIdentifier.getSocketAddress(), nodeIdentifier.getSocketPort(), z);
    }

    private Socket createSocket(String str, int i, boolean z) {
        try {
            Socket createSocket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(str, i), this.socketConfiguration);
            if (z) {
                setConnectionHandshakeTimeoutOnSocket(createSocket);
            }
            return createSocket;
        } catch (IOException e) {
            throw new ProtocolException("Failed to create socket due to: " + e, e);
        }
    }

    @Override // org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender
    public void notifyNodeStatusChange(Set<NodeIdentifier> set, NodeStatusChangeMessage nodeStatusChangeMessage) {
        if (set.isEmpty()) {
            return;
        }
        int min = Math.min(set.size(), NiFiProperties.getInstance().getClusterNodeProtocolThreads());
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    this.protocolContext.createMarshaller().marshal(nodeStatusChangeMessage, byteArrayOutputStream);
                    final byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(min, new ThreadFactory() { // from class: org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender.1
                        private final AtomicInteger counter = new AtomicInteger(0);

                        @Override // java.util.concurrent.ThreadFactory
                        public Thread newThread(Runnable runnable) {
                            Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
                            newThread.setDaemon(true);
                            newThread.setName("Notify Cluster of Node Status Change-" + this.counter.incrementAndGet());
                            return newThread;
                        }
                    });
                    for (final NodeIdentifier nodeIdentifier : set) {
                        newFixedThreadPool.submit(new Runnable() { // from class: org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender.2
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    Socket createSocket = StandardClusterCoordinationProtocolSender.this.createSocket(nodeIdentifier, true);
                                    Throwable th3 = null;
                                    try {
                                        createSocket.getOutputStream().write(byteArray);
                                        if (createSocket != null) {
                                            if (0 != 0) {
                                                try {
                                                    createSocket.close();
                                                } catch (Throwable th4) {
                                                    th3.addSuppressed(th4);
                                                }
                                            } else {
                                                createSocket.close();
                                            }
                                        }
                                    } finally {
                                    }
                                } catch (IOException e) {
                                    throw new ProtocolException("Failed to send Node Status Change message to " + nodeIdentifier, e);
                                }
                            }
                        });
                    }
                    newFixedThreadPool.shutdown();
                    try {
                        newFixedThreadPool.awaitTermination(10L, TimeUnit.DAYS);
                    } catch (InterruptedException e) {
                        throw new ProtocolException(e);
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e2) {
            throw new ProtocolException("Failed to marshal NodeStatusChangeMessage", e2);
        }
    }
}
