/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.record.listen;

import java.io.Closeable;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.listen.IOUtils;
import org.apache.nifi.record.listen.SSLSocketChannelRecordReader;
import org.apache.nifi.record.listen.SocketChannelRecordReader;
import org.apache.nifi.record.listen.StandardSocketChannelRecordReader;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.serialization.RecordReaderFactory;

public class SocketChannelRecordReaderDispatcher
implements Runnable,
Closeable {
    private final ServerSocketChannel serverSocketChannel;
    private final SSLContext sslContext;
    private final SslContextFactory.ClientAuth clientAuth;
    private final int socketReadTimeout;
    private final int receiveBufferSize;
    private final int maxConnections;
    private final RecordReaderFactory readerFactory;
    private final BlockingQueue<SocketChannelRecordReader> recordReaders;
    private final ComponentLog logger;
    private final AtomicInteger currentConnections = new AtomicInteger(0);
    private volatile boolean stopped = false;

    public SocketChannelRecordReaderDispatcher(ServerSocketChannel serverSocketChannel, SSLContext sslContext, SslContextFactory.ClientAuth clientAuth, int socketReadTimeout, int receiveBufferSize, int maxConnections, RecordReaderFactory readerFactory, BlockingQueue<SocketChannelRecordReader> recordReaders, ComponentLog logger) {
        this.serverSocketChannel = serverSocketChannel;
        this.sslContext = sslContext;
        this.clientAuth = clientAuth;
        this.socketReadTimeout = socketReadTimeout;
        this.receiveBufferSize = receiveBufferSize;
        this.maxConnections = maxConnections;
        this.readerFactory = readerFactory;
        this.recordReaders = recordReaders;
        this.logger = logger;
    }

    @Override
    public void run() {
        while (!this.stopped) {
            try {
                SocketChannelRecordReader socketChannelRecordReader;
                String remoteAddress;
                SocketChannel socketChannel = this.serverSocketChannel.accept();
                if (socketChannel == null) {
                    Thread.sleep(20L);
                    continue;
                }
                socketChannel.setOption((SocketOption)StandardSocketOptions.SO_KEEPALIVE, (Object)true);
                SocketAddress remoteSocketAddress = socketChannel.getRemoteAddress();
                socketChannel.socket().setSoTimeout(this.socketReadTimeout);
                socketChannel.socket().setReceiveBufferSize(this.receiveBufferSize);
                if (this.currentConnections.incrementAndGet() > this.maxConnections) {
                    this.currentConnections.decrementAndGet();
                    remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
                    this.logger.warn("Rejecting connection from {} because max connections has been met", new Object[]{remoteAddress});
                    IOUtils.closeQuietly(socketChannel);
                    continue;
                }
                if (this.logger.isDebugEnabled()) {
                    remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
                    this.logger.debug("Accepted connection from {}", new Object[]{remoteAddress});
                }
                if (this.sslContext == null) {
                    socketChannelRecordReader = new StandardSocketChannelRecordReader(socketChannel, this.readerFactory, this);
                } else {
                    SSLEngine sslEngine = this.sslContext.createSSLEngine();
                    sslEngine.setUseClientMode(false);
                    switch (this.clientAuth) {
                        case REQUIRED: {
                            sslEngine.setNeedClientAuth(true);
                            break;
                        }
                        case WANT: {
                            sslEngine.setWantClientAuth(true);
                            break;
                        }
                        case NONE: {
                            sslEngine.setNeedClientAuth(false);
                            sslEngine.setWantClientAuth(false);
                        }
                    }
                    SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
                    socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, this.readerFactory, this);
                }
                this.recordReaders.offer(socketChannelRecordReader);
            }
            catch (Exception e) {
                this.logger.error("Error dispatching connection: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    public int getPort() {
        return this.serverSocketChannel == null ? 0 : this.serverSocketChannel.socket().getLocalPort();
    }

    @Override
    public void close() {
        this.stopped = true;
        IOUtils.closeQuietly(this.serverSocketChannel);
    }

    public void connectionCompleted() {
        this.currentConnections.decrementAndGet();
    }
}

