package org.apache.nifi.cluster.protocol.impl;

import java.io.IOException;
import java.net.Socket;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketListener;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.class */
public class SocketProtocolListener extends SocketListener implements ProtocolListener {
    private static final Logger logger = LoggerFactory.getLogger(SocketProtocolListener.class);
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final Collection<ProtocolHandler> handlers;
    private volatile BulletinRepository bulletinRepository;

    public SocketProtocolListener(int i, int i2, ServerSocketConfiguration serverSocketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        super(i, i2, serverSocketConfiguration);
        this.handlers = new CopyOnWriteArrayList();
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.protocolContext = protocolContext;
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
        this.bulletinRepository = bulletinRepository;
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public void start() throws IOException {
        if (super.isRunning()) {
            throw new IllegalStateException("Instance is already started.");
        }
        super.start();
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public void stop() throws IOException {
        if (!super.isRunning()) {
            throw new IOException("Instance is already stopped.");
        }
        super.stop();
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public Collection<ProtocolHandler> getHandlers() {
        return Collections.unmodifiableCollection(this.handlers);
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public void addHandler(ProtocolHandler protocolHandler) {
        if (protocolHandler == null) {
            throw new NullPointerException("Protocol handler may not be null.");
        }
        this.handlers.add(protocolHandler);
    }

    @Override // org.apache.nifi.cluster.protocol.ProtocolListener
    public boolean removeHandler(ProtocolHandler protocolHandler) {
        return this.handlers.remove(protocolHandler);
    }

    /* JADX WARN: Finally extract failed */
    public void dispatchRequest(Socket socket) {
        String str = null;
        try {
            StopWatch stopWatch = new StopWatch(true);
            str = socket.getInetAddress().getHostName();
            String uuid = UUID.randomUUID().toString();
            logger.debug("Received request {} from {}", uuid, str);
            ProtocolMessageUnmarshaller<ProtocolMessage> createUnmarshaller = this.protocolContext.createUnmarshaller();
            CopyingInputStream byteCountingInputStream = new ByteCountingInputStream(socket.getInputStream());
            CopyingInputStream copyingInputStream = byteCountingInputStream;
            if (logger.isDebugEnabled()) {
                copyingInputStream = new CopyingInputStream(copyingInputStream, 1048576);
            }
            try {
                ProtocolMessage unmarshal = createUnmarshaller.unmarshal(copyingInputStream);
                if (logger.isDebugEnabled() && (copyingInputStream instanceof CopyingInputStream)) {
                    logger.debug("Received message: " + new String(copyingInputStream.getBytesRead()));
                }
                Set<String> certificateIdentities = getCertificateIdentities(socket);
                ProtocolHandler protocolHandler = null;
                Collection<ProtocolHandler> handlers = getHandlers();
                Iterator<ProtocolHandler> it = handlers.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ProtocolHandler next = it.next();
                    if (next.canHandle(unmarshal)) {
                        protocolHandler = next;
                        break;
                    }
                }
                if (protocolHandler == null) {
                    logger.error("Received request of type {} but none of the following Protocol Handlers were able to process the request: {}", unmarshal.getType(), handlers);
                    throw new ProtocolException("No handler assigned to handle message type: " + unmarshal.getType());
                }
                ProtocolMessage handle = protocolHandler.handle(unmarshal, certificateIdentities);
                if (handle != null) {
                    try {
                        logger.debug("Sending response for request {}", uuid);
                        this.protocolContext.createMarshaller().marshal(handle, socket.getOutputStream());
                    } catch (IOException e) {
                        throw new ProtocolException("Failed marshalling protocol message in response to message type: " + unmarshal.getType() + " due to " + e, e);
                    }
                }
                stopWatch.stop();
                NodeIdentifier nodeIdentifier = getNodeIdentifier(unmarshal);
                logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {}", new Object[]{uuid, unmarshal.getType(), Long.valueOf(byteCountingInputStream.getBytesRead()), nodeIdentifier == null ? str : nodeIdentifier.toString(), stopWatch.getDuration()});
            } catch (Throwable th) {
                if (logger.isDebugEnabled() && (copyingInputStream instanceof CopyingInputStream)) {
                    logger.debug("Received message: " + new String(copyingInputStream.getBytesRead()));
                }
                throw th;
            }
        } catch (IOException | ProtocolException e2) {
            logger.warn("Failed processing protocol message from " + str + " due to " + e2, e2);
            if (this.bulletinRepository != null) {
                this.bulletinRepository.addBulletin(BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", str, e2.toString())));
            }
        }
    }

    private NodeIdentifier getNodeIdentifier(ProtocolMessage protocolMessage) {
        if (protocolMessage == null) {
            return null;
        }
        switch (protocolMessage.getType()) {
            case CONNECTION_REQUEST:
                return ((ConnectionRequestMessage) protocolMessage).getConnectionRequest().getProposedNodeIdentifier();
            case HEARTBEAT:
                return ((HeartbeatMessage) protocolMessage).getHeartbeat().getNodeIdentifier();
            case OFFLOAD_REQUEST:
                return ((OffloadMessage) protocolMessage).getNodeId();
            case DISCONNECTION_REQUEST:
                return ((DisconnectMessage) protocolMessage).getNodeId();
            case FLOW_REQUEST:
                return ((FlowRequestMessage) protocolMessage).getNodeId();
            case RECONNECTION_REQUEST:
                return ((ReconnectionRequestMessage) protocolMessage).getNodeId();
            default:
                return null;
        }
    }

    private Set<String> getCertificateIdentities(Socket socket) throws IOException {
        if (!(socket instanceof SSLSocket)) {
            return Collections.emptySet();
        }
        try {
            return getCertificateIdentities(((SSLSocket) socket).getSession());
        } catch (CertificateException e) {
            throw new IOException("Could not extract Subject Alternative Names from client's certificate", e);
        }
    }

    private Set<String> getCertificateIdentities(SSLSession sSLSession) throws CertificateException, SSLPeerUnverifiedException {
        Certificate[] peerCertificates = sSLSession.getPeerCertificates();
        if (peerCertificates == null || peerCertificates.length == 0) {
            throw new SSLPeerUnverifiedException("No certificates found");
        }
        X509Certificate convertAbstractX509Certificate = CertificateUtils.convertAbstractX509Certificate(peerCertificates[0]);
        convertAbstractX509Certificate.checkValidity();
        return (Set) CertificateUtils.getSubjectAlternativeNames(convertAbstractX509Certificate).stream().map(CertificateUtils::extractUsername).collect(Collectors.toSet());
    }
}
