/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.server;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionLoadBalanceServer {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
    private final String hostname;
    private final int port;
    private final SSLContext sslContext;
    private final ExecutorService threadPool;
    private final LoadBalanceProtocol loadBalanceProtocol;
    private final int connectionTimeoutMillis;
    private final int numThreads;
    private final EventReporter eventReporter;
    private volatile Set<CommunicateAction> communicationActions = Collections.emptySet();
    private final BlockingQueue<Socket> connectionQueue = new LinkedBlockingQueue<Socket>();
    private volatile AcceptConnection acceptConnection;
    private volatile ServerSocket serverSocket;
    private volatile boolean stopped = true;

    public ConnectionLoadBalanceServer(String hostname, int port, SSLContext sslContext, int numThreads, LoadBalanceProtocol loadBalanceProtocol, EventReporter eventReporter, int connectionTimeoutMillis) {
        this.hostname = hostname;
        this.port = port;
        this.sslContext = sslContext;
        this.loadBalanceProtocol = loadBalanceProtocol;
        this.connectionTimeoutMillis = connectionTimeoutMillis;
        this.numThreads = numThreads;
        this.eventReporter = eventReporter;
        this.threadPool = new FlowEngine(numThreads, "Load Balance Server");
    }

    public void start() throws IOException {
        if (!this.stopped) {
            return;
        }
        this.stopped = false;
        if (this.serverSocket != null) {
            return;
        }
        try {
            this.serverSocket = this.createServerSocket();
        }
        catch (Exception e) {
            throw new IOException("Could not begin listening for incoming connections in order to load balance data across the cluster. Please verify the values of the 'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' properties", e);
        }
        HashSet<CommunicateAction> actions = new HashSet<CommunicateAction>(this.numThreads);
        for (int i = 0; i < this.numThreads; ++i) {
            CommunicateAction action = new CommunicateAction(this.loadBalanceProtocol);
            actions.add(action);
            this.threadPool.submit(action);
        }
        this.communicationActions = actions;
        this.acceptConnection = new AcceptConnection(this.serverSocket);
        Thread receiveConnectionThread = new Thread(this.acceptConnection);
        receiveConnectionThread.setName("Receive Queue Load-Balancing Connections");
        receiveConnectionThread.start();
    }

    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    public void stop() {
        Socket socket;
        this.stopped = false;
        this.threadPool.shutdown();
        if (this.acceptConnection != null) {
            this.acceptConnection.stop();
        }
        this.communicationActions.forEach(CommunicateAction::stop);
        while ((socket = (Socket)this.connectionQueue.poll()) != null) {
            try {
                socket.close();
                logger.info("{} Closed connection to {} on Server stop", (Object)this, (Object)socket.getRemoteSocketAddress());
            }
            catch (IOException ioe) {
                logger.warn("Failed to properly close socket to " + socket.getRemoteSocketAddress(), (Throwable)ioe);
            }
        }
    }

    private ServerSocket createServerSocket() throws IOException {
        InetAddress inetAddress;
        InetAddress inetAddress2 = inetAddress = this.hostname == null ? null : InetAddress.getByName(this.hostname);
        if (this.sslContext == null) {
            return new ServerSocket(this.port, 50, InetAddress.getByName(this.hostname));
        }
        ServerSocket serverSocket = this.sslContext.getServerSocketFactory().createServerSocket(this.port, 50, inetAddress);
        ((SSLServerSocket)serverSocket).setNeedClientAuth(true);
        return serverSocket;
    }

    public String toString() {
        return "ConnectionLoadBalanceServer[hostname=" + this.hostname + ", port=" + this.port + ", secure=" + (this.sslContext != null) + "]";
    }

    private class AcceptConnection
    implements Runnable {
        private final ServerSocket serverSocket;
        private volatile boolean stopped = false;

        public AcceptConnection(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override
        public void run() {
            try {
                this.serverSocket.setSoTimeout(1000);
            }
            catch (Exception e) {
                logger.error("Failed to set soTimeout on Server Socket for Load Balancing data across cluster", (Throwable)e);
            }
            while (!this.stopped) {
                try {
                    Socket socket;
                    try {
                        socket = this.serverSocket.accept();
                    }
                    catch (SocketTimeoutException ste) {
                        continue;
                    }
                    socket.setSoTimeout(ConnectionLoadBalanceServer.this.connectionTimeoutMillis);
                    ConnectionLoadBalanceServer.this.connectionQueue.offer(socket);
                }
                catch (Exception e) {
                    logger.error("{} Failed to accept connection from other node in cluster", (Object)ConnectionLoadBalanceServer.this, (Object)e);
                }
            }
            try {
                this.serverSocket.close();
            }
            catch (Exception e) {
                logger.warn("Failed to properly shutdown Server Socket for Load Balancing", (Throwable)e);
            }
        }
    }

    private class CommunicateAction
    implements Runnable {
        private final LoadBalanceProtocol loadBalanceProtocol;
        private volatile boolean stopped = false;

        public CommunicateAction(LoadBalanceProtocol loadBalanceProtocol) {
            this.loadBalanceProtocol = loadBalanceProtocol;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override
        public void run() {
            String peerDescription = "<Unknown Client>";
            while (!this.stopped) {
                Socket socket = null;
                try {
                    socket = (Socket)ConnectionLoadBalanceServer.this.connectionQueue.poll(1L, TimeUnit.SECONDS);
                    if (socket == null) continue;
                    peerDescription = socket.getRemoteSocketAddress().toString();
                    if (socket.isClosed()) {
                        logger.debug("Connection to Peer {} is closed. Will not attempt to communicate over this Socket.", (Object)peerDescription);
                        continue;
                    }
                    logger.debug("Receiving FlowFiles from Peer {}", (Object)peerDescription);
                    this.loadBalanceProtocol.receiveFlowFiles(socket);
                    if (socket.isConnected()) {
                        logger.debug("Finished receiving FlowFiles from Peer {}. Will recycle connection.", (Object)peerDescription);
                        ConnectionLoadBalanceServer.this.connectionQueue.offer(socket);
                        continue;
                    }
                    logger.debug("Finished receiving FlowFiles from Peer {}. Socket is no longer connected so will not recycle connection.", (Object)peerDescription);
                }
                catch (Exception e) {
                    if (socket != null) {
                        try {
                            socket.close();
                        }
                        catch (IOException ioe) {
                            e.addSuppressed(ioe);
                        }
                    }
                    logger.error("Failed to communicate with Peer {}", (Object)peerDescription, (Object)e);
                    ConnectionLoadBalanceServer.this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
                }
            }
            logger.info("Connection Load Balance Server shutdown. Will no longer handle incoming requests.");
        }
    }
}

