/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.cert.CertificateException;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.security.util.CertificateUtils;

public class NodeProtocolSenderImpl
implements NodeProtocolSender {
    private final SocketConfiguration socketConfiguration;
    private final ClusterServiceLocator clusterManagerProtocolServiceLocator;
    private final ProtocolContext<ProtocolMessage> protocolContext;

    public NodeProtocolSenderImpl(ClusterServiceLocator clusterManagerProtocolServiceLocator, SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        if (clusterManagerProtocolServiceLocator == null) {
            throw new IllegalArgumentException("Protocol Service Locator may not be null.");
        }
        if (socketConfiguration == null) {
            throw new IllegalArgumentException("Socket configuration may not be null.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator;
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
    }

    @Override
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
        Socket socket = null;
        try {
            ProtocolMessage response;
            socket = this.createSocket();
            String ncmDn = this.getNCMDN(socket);
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
            try {
                ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                response = unmarshaller.unmarshal(socket.getInputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' protocol message due to: " + ioe, ioe);
            }
            if (ProtocolMessage.MessageType.CONNECTION_RESPONSE == response.getType()) {
                ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage)response;
                connectionResponse.setClusterManagerDN(ncmDn);
                ConnectionResponseMessage connectionResponseMessage = connectionResponse;
                return connectionResponseMessage;
            }
            throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' but found '" + (Object)((Object)response.getType()) + "'");
        }
        finally {
            SocketUtils.closeQuietly((Socket)socket);
        }
    }

    private String getNCMDN(Socket socket) {
        try {
            return CertificateUtils.extractPeerDNFromSSLSocket((Socket)socket);
        }
        catch (CertificateException e) {
            throw new ProtocolException(e);
        }
    }

    @Override
    public void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
        this.sendProtocolMessage(msg);
    }

    @Override
    public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
        this.sendProtocolMessage(msg);
    }

    @Override
    public void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
        this.sendProtocolMessage(msg);
    }

    @Override
    public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
        this.sendProtocolMessage(msg);
    }

    private Socket createSocket() {
        DiscoverableService service = this.clusterManagerProtocolServiceLocator.getService();
        if (service == null) {
            throw new UnknownServiceAddressException("Cluster Manager's service is not known.  Verify a cluster manager is running.");
        }
        try {
            return SocketUtils.createSocket((InetSocketAddress)service.getServiceAddress(), (SocketConfiguration)this.socketConfiguration);
        }
        catch (IOException ioe) {
            throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendProtocolMessage(ProtocolMessage msg) {
        Socket socket = null;
        try {
            socket = this.createSocket();
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
        }
        finally {
            SocketUtils.closeQuietly((Socket)socket);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }
}

