/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionLoadBalanceServer {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
    private static final AtomicLong threadCounter = new AtomicLong(1L);
    private final String hostname;
    private final int port;
    private final SSLContext sslContext;
    private final LoadBalanceProtocol loadBalanceProtocol;
    private final int connectionTimeoutMillis;
    private final EventReporter eventReporter;
    private final List<CommunicateAction> communicationActions = Collections.synchronizedList(new ArrayList());
    private volatile AcceptConnection acceptConnection;
    private volatile ServerSocket serverSocket;
    private volatile boolean stopped = true;

    public ConnectionLoadBalanceServer(String hostname, int port, SSLContext sslContext, int numThreads, LoadBalanceProtocol loadBalanceProtocol, EventReporter eventReporter, int connectionTimeoutMillis) {
        this.hostname = hostname;
        this.port = port;
        this.sslContext = sslContext;
        this.loadBalanceProtocol = loadBalanceProtocol;
        this.connectionTimeoutMillis = connectionTimeoutMillis;
        this.eventReporter = eventReporter;
    }

    public void start() throws IOException {
        if (!this.stopped) {
            return;
        }
        this.stopped = false;
        if (this.serverSocket != null) {
            return;
        }
        try {
            this.serverSocket = this.createServerSocket();
        }
        catch (Exception e) {
            throw new IOException("Could not begin listening for incoming connections in order to load balance data across the cluster. Please verify the values of the 'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' properties", e);
        }
        this.acceptConnection = new AcceptConnection(this.serverSocket);
        Thread receiveConnectionThread = new Thread(this.acceptConnection);
        receiveConnectionThread.setName("Receive Queue Load-Balancing Connections");
        receiveConnectionThread.start();
    }

    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    public void stop() {
        this.stopped = false;
        if (this.acceptConnection != null) {
            this.acceptConnection.stop();
        }
        Iterator<CommunicateAction> itr = this.communicationActions.iterator();
        while (itr.hasNext()) {
            itr.next().stop();
            itr.remove();
        }
    }

    private ServerSocket createServerSocket() throws IOException {
        InetAddress inetAddress;
        InetAddress inetAddress2 = inetAddress = this.hostname == null ? null : InetAddress.getByName(this.hostname);
        if (this.sslContext == null) {
            return new ServerSocket(this.port, 50, InetAddress.getByName(this.hostname));
        }
        SSLServerSocket serverSocket = (SSLServerSocket)this.sslContext.getServerSocketFactory().createServerSocket(this.port, 50, inetAddress);
        serverSocket.setNeedClientAuth(true);
        serverSocket.setEnabledProtocols(TlsConfiguration.getCurrentSupportedTlsProtocolVersions());
        return serverSocket;
    }

    public String toString() {
        return "ConnectionLoadBalanceServer[hostname=" + this.hostname + ", port=" + this.port + ", secure=" + (this.sslContext != null) + "]";
    }

    private class AcceptConnection
    implements Runnable {
        private final ServerSocket serverSocket;
        private volatile boolean stopped = false;

        public AcceptConnection(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override
        public void run() {
            try {
                this.serverSocket.setSoTimeout(1000);
            }
            catch (Exception e) {
                logger.error("Failed to set soTimeout on Server Socket for Load Balancing data across cluster", (Throwable)e);
            }
            while (!this.stopped) {
                try {
                    Socket socket;
                    try {
                        socket = this.serverSocket.accept();
                    }
                    catch (SocketTimeoutException ste) {
                        continue;
                    }
                    socket.setSoTimeout(ConnectionLoadBalanceServer.this.connectionTimeoutMillis);
                    CommunicateAction communicateAction = new CommunicateAction(ConnectionLoadBalanceServer.this.loadBalanceProtocol, socket, ConnectionLoadBalanceServer.this.eventReporter);
                    Thread commsThread = new Thread(communicateAction);
                    commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement());
                    commsThread.start();
                    ConnectionLoadBalanceServer.this.communicationActions.add(communicateAction);
                }
                catch (Exception e) {
                    logger.error("{} Failed to accept connection from other node in cluster", (Object)ConnectionLoadBalanceServer.this, (Object)e);
                }
            }
            try {
                this.serverSocket.close();
            }
            catch (Exception e) {
                logger.warn("Failed to properly shutdown Server Socket for Load Balancing", (Throwable)e);
            }
        }
    }

    protected static class CommunicateAction
    implements Runnable {
        private final LoadBalanceProtocol loadBalanceProtocol;
        private final Socket socket;
        private final InputStream in;
        private final OutputStream out;
        private final EventReporter eventReporter;
        private volatile boolean stopped = false;
        private static int EXCEPTION_THRESHOLD_MILLIS = 10000;
        private volatile long tlsErrorLastSeen = -1L;

        public CommunicateAction(LoadBalanceProtocol loadBalanceProtocol, Socket socket, EventReporter eventReporter) throws IOException {
            this.loadBalanceProtocol = loadBalanceProtocol;
            this.socket = socket;
            this.eventReporter = eventReporter;
            this.in = new BufferedInputStream(socket.getInputStream());
            this.out = new BufferedOutputStream(socket.getOutputStream());
        }

        public void stop() {
            this.stopped = true;
        }

        @Override
        public void run() {
            String peerDescription = "<Unknown Client>";
            while (!this.stopped) {
                try {
                    peerDescription = this.socket.getRemoteSocketAddress().toString();
                    logger.debug("Receiving FlowFiles from Peer {}", (Object)peerDescription);
                    this.loadBalanceProtocol.receiveFlowFiles(this.socket, this.in, this.out);
                    if (!this.socket.isClosed()) continue;
                    logger.debug("Finished Receiving FlowFiles from Peer {}", (Object)peerDescription);
                    break;
                }
                catch (Exception e) {
                    if (this.socket != null) {
                        try {
                            this.socket.close();
                        }
                        catch (IOException ioe) {
                            e.addSuppressed(ioe);
                        }
                    }
                    if (CertificateUtils.isTlsError((Throwable)e)) {
                        this.handleTlsError(peerDescription, e);
                    } else {
                        logger.error("Failed to communicate with Peer {}", (Object)peerDescription, (Object)e);
                        this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
                    }
                    return;
                }
            }
        }

        private boolean handleTlsError(String peerDescription, Throwable e) {
            String populatedMessage = "Failed to communicate with Peer " + peerDescription + " due to " + e.getLocalizedMessage();
            if (this.tlsErrorRecentlySeen()) {
                logger.debug(populatedMessage);
                return false;
            }
            logger.error(populatedMessage);
            logger.info("\tPrinted above error because it has been {} ms since the last printing", (Object)(System.currentTimeMillis() - this.tlsErrorLastSeen));
            this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", populatedMessage);
            this.tlsErrorLastSeen = System.currentTimeMillis();
            return true;
        }

        private boolean tlsErrorRecentlySeen() {
            long now = System.currentTimeMillis();
            return now - this.tlsErrorLastSeen < (long)EXCEPTION_THRESHOLD_MILLIS;
        }
    }
}

