/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.io.nio;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.io.nio.AbstractChannelReader;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.io.nio.ChannelDispatcher;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ChannelListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
    private final ScheduledExecutorService executor;
    private final Selector serverSocketSelector;
    private final Selector socketChannelSelector;
    private final ChannelDispatcher channelDispatcher;
    private final BufferPool bufferPool;
    private final int initialBufferPoolSize;
    private volatile long channelReaderFrequencyMSecs = 50L;

    public ChannelListener(int threadPoolSize, StreamConsumerFactory consumerFactory, BufferPool bufferPool, int timeout, TimeUnit unit, boolean readSingleDatagram) throws IOException {
        this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1);
        this.serverSocketSelector = Selector.open();
        this.socketChannelSelector = Selector.open();
        this.bufferPool = bufferPool;
        this.initialBufferPoolSize = bufferPool.size();
        this.channelDispatcher = new ChannelDispatcher(this.serverSocketSelector, this.socketChannelSelector, this.executor, consumerFactory, bufferPool, timeout, unit, readSingleDatagram);
        this.executor.schedule(this.channelDispatcher, 50L, TimeUnit.MILLISECONDS);
    }

    public void setChannelReaderSchedulingPeriod(long period, TimeUnit unit) {
        this.channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
        this.channelDispatcher.setChannelReaderFrequency(period, unit);
    }

    public void addServerSocket(InetAddress nicIPAddress, int port, int receiveBufferSize) throws IOException {
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        ssChannel.configureBlocking(false);
        if (receiveBufferSize > 0) {
            ssChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)receiveBufferSize);
            int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
            if (actualReceiveBufSize < receiveBufferSize) {
                LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to " + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + "maximum receive buffer");
            }
        }
        ssChannel.setOption((SocketOption)StandardSocketOptions.SO_REUSEADDR, (Object)true);
        ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
        ssChannel.register(this.serverSocketSelector, 16);
    }

    public void addDatagramChannel(InetAddress nicIPAddress, int port, int receiveBufferSize) throws IOException {
        DatagramChannel dChannel = this.createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
        dChannel.register(this.socketChannelSelector, 1);
    }

    public void addDatagramChannel(InetAddress nicIPAddress, int port, int receiveBufferSize, String sendingHost, Integer sendingPort) throws IOException {
        if (sendingHost == null || sendingPort == null) {
            this.addDatagramChannel(nicIPAddress, port, receiveBufferSize);
            return;
        }
        DatagramChannel dChannel = this.createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
        dChannel.connect(new InetSocketAddress(sendingHost, (int)sendingPort));
        dChannel.register(this.socketChannelSelector, 1);
    }

    private DatagramChannel createAndBindDatagramChannel(InetAddress nicIPAddress, int port, int receiveBufferSize) throws IOException {
        DatagramChannel dChannel = DatagramChannel.open();
        dChannel.configureBlocking(false);
        if (receiveBufferSize > 0) {
            dChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)receiveBufferSize);
            int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
            if (actualReceiveBufSize < receiveBufferSize) {
                LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to " + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + "maximum receive buffer");
            }
        }
        dChannel.setOption((SocketOption)StandardSocketOptions.SO_REUSEADDR, (Object)true);
        dChannel.bind(new InetSocketAddress(nicIPAddress, port));
        return dChannel;
    }

    public void shutdown(long period, TimeUnit timeUnit) {
        this.channelDispatcher.stop();
        for (SelectionKey selectionKey : this.socketChannelSelector.keys()) {
            AbstractChannelReader reader = (AbstractChannelReader)selectionKey.attachment();
            selectionKey.cancel();
            if (reader != null) {
                while (!reader.isClosed()) {
                    try {
                        Thread.sleep(this.channelReaderFrequencyMSecs);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
                readerFuture.cancel(false);
            }
            IOUtils.closeQuietly((Closeable)selectionKey.channel());
        }
        IOUtils.closeQuietly((Selector)this.socketChannelSelector);
        for (SelectionKey selectionKey : this.serverSocketSelector.keys()) {
            selectionKey.cancel();
            IOUtils.closeQuietly((Closeable)selectionKey.channel());
        }
        IOUtils.closeQuietly((Selector)this.serverSocketSelector);
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(period, timeUnit);
        }
        catch (InterruptedException ex) {
            LOGGER.warn("Interrupted while trying to shutdown executor");
        }
        int currentBufferPoolSize = this.bufferPool.size();
        String warning = currentBufferPoolSize != this.initialBufferPoolSize ? "Initial buffer count=" + this.initialBufferPoolSize + " Current buffer count=" + currentBufferPoolSize + " Could indicate a buffer leak.  Ensure all consumers are executed until they complete." : "";
        LOGGER.info("Channel listener shutdown. " + warning);
    }
}

