/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractSyslogProcessor;
import org.apache.nifi.processors.standard.ParseSyslog;
import org.apache.nifi.processors.standard.PutSyslog;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteArrayOutputStream;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription(value="Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches one of these patterns, the message will be parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. If an incoming message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message in the content of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the invalid relationship.")
@WritesAttributes(value={@WritesAttribute(attribute="syslog.priority", description="The priority of the Syslog message."), @WritesAttribute(attribute="syslog.severity", description="The severity of the Syslog message derived from the priority."), @WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."), @WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."), @WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."), @WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."), @WritesAttribute(attribute="syslog.sender", description="The hostname of the Syslog server that sent the message."), @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."), @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. If this value is false, the other attributes will be empty and only the original message will be available in the content."), @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."), @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."), @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
@SeeAlso(value={PutSyslog.class, ParseSyslog.class})
public class ListenSyslog
extends AbstractSyslogProcessor {
    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Receive Buffer Size").description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read from an incoming connection until the buffer is full, or the connection is closed. ").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("65507 B").required(true).build();
    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Size of Socket Buffer").description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder().name("Max Number of TCP Connections").description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.").addValidator(StandardValidators.createLongValidator((long)1L, (long)65535L, (boolean)true)).defaultValue("2").required(true).build();
    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("Max Batch Size").description("The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with the <Message Delimiter> up to this configured maximum number of messages").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).defaultValue("1").required(true).build();
    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Message Delimiter").description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("\\n").required(true).build();
    public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder().name("Parse Messages").description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.").allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog messages will be received over a secure connection.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.").build();
    public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.").build();
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    private volatile ChannelReader channelReader;
    private volatile SyslogParser parser;
    private volatile BlockingQueue<ByteBuffer> bufferPool;
    private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<RawSyslogEvent>(10);
    private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<RawSyslogEvent>();
    private volatile byte[] messageDemarcatorBytes;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(PROTOCOL);
        descriptors.add(PORT);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(RECV_BUFFER_SIZE);
        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
        descriptors.add(MAX_CONNECTIONS);
        descriptors.add(MAX_BATCH_SIZE);
        descriptors.add(MESSAGE_DELIMITER);
        descriptors.add(PARSE_MESSAGES);
        descriptors.add(CHARSET);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_INVALID);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (PROTOCOL.equals((Object)descriptor)) {
            if (this.syslogEvents != null) {
                this.syslogEvents.clear();
            }
            if (this.errorEvents != null) {
                this.errorEvents.clear();
            }
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        if (validationContext.getProperty(MAX_BATCH_SIZE).asInteger() > 1 && validationContext.getProperty(PARSE_MESSAGES).asBoolean().booleanValue()) {
            results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false).explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build());
        }
        String protocol = validationContext.getProperty(PROTOCOL).getValue();
        SSLContextService sslContextService = (SSLContextService)validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
            results.add(new ValidationResult.Builder().explanation("SSL can not be used with UDP").valid(false).subject("SSL Context").build());
        }
        return results;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        int port = context.getProperty(PORT).asInteger();
        int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        String protocol = context.getProperty(PROTOCOL).getValue();
        String charSet = context.getProperty(CHARSET).getValue();
        String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        String charsetName = context.getProperty(CHARSET).getValue();
        this.messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charsetName));
        int maxConnections = protocol.equals(UDP_VALUE.getValue()) ? 1 : context.getProperty(MAX_CONNECTIONS).asLong().intValue();
        this.bufferPool = new LinkedBlockingQueue<ByteBuffer>(maxConnections);
        for (int i = 0; i < maxConnections; ++i) {
            this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
        }
        this.parser = new SyslogParser(Charset.forName(charSet));
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        this.channelReader = this.createChannelReader(protocol, this.bufferPool, this.syslogEvents, maxConnections, sslContextService);
        this.channelReader.open(port, maxChannelBufferSize);
        Thread readerThread = new Thread(this.channelReader);
        readerThread.setName("ListenSyslog [" + this.getIdentifier() + "]");
        readerThread.setDaemon(true);
        readerThread.start();
    }

    protected SyslogParser getParser() {
        return this.parser;
    }

    protected ChannelReader createChannelReader(String protocol, BlockingQueue<ByteBuffer> bufferPool, BlockingQueue<RawSyslogEvent> syslogEvents, int maxConnections, SSLContextService sslContextService) throws IOException {
        if (protocol.equals(UDP_VALUE.getValue())) {
            return new DatagramChannelReader(bufferPool, syslogEvents, this.getLogger());
        }
        SSLContext sslContext = null;
        if (sslContextService != null) {
            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
        }
        return new SocketChannelDispatcher(bufferPool, syslogEvents, this.getLogger(), maxConnections, sslContext);
    }

    protected int getPort() {
        return this.channelReader == null ? 0 : this.channelReader.getPort();
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.channelReader != null) {
            this.channelReader.stop();
            this.channelReader.close();
        }
    }

    protected RawSyslogEvent getMessage(boolean longPoll, boolean pollErrorQueue) {
        RawSyslogEvent rawSyslogEvent = null;
        if (pollErrorQueue) {
            rawSyslogEvent = (RawSyslogEvent)this.errorEvents.poll();
        }
        if (rawSyslogEvent == null) {
            try {
                rawSyslogEvent = longPoll ? this.syslogEvents.poll(100L, TimeUnit.MILLISECONDS) : (RawSyslogEvent)this.syslogEvents.poll();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        return rawSyslogEvent;
    }

    protected int getErrorQueueSize() {
        return this.errorEvents.size();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile;
        String sender;
        RawSyslogEvent rawSyslogEvent = this.getMessage(true, true);
        if (rawSyslogEvent == null) {
            context.yield();
            return;
        }
        int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
        String port = context.getProperty(PORT).getValue();
        String protocol = context.getProperty(PROTOCOL).getValue();
        HashMap<String, String> defaultAttributes = new HashMap<String, String>(4);
        defaultAttributes.put(AbstractSyslogProcessor.SyslogAttributes.PROTOCOL.key(), protocol);
        defaultAttributes.put(AbstractSyslogProcessor.SyslogAttributes.PORT.key(), port);
        defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
        int numAttributes = AbstractSyslogProcessor.SyslogAttributes.values().length + 2;
        boolean shouldParse = context.getProperty(PARSE_MESSAGES).asBoolean();
        HashMap<String, FlowFile> flowFilePerSender = new HashMap<String, FlowFile>();
        SyslogParser parser = this.getParser();
        for (int i = 0; i < maxBatchSize; ++i) {
            SyslogEvent event = null;
            if (i > 0 && (rawSyslogEvent = this.getMessage(false, false)) == null) break;
            sender = rawSyslogEvent.getSender();
            flowFile = (FlowFile)flowFilePerSender.get(sender);
            if (flowFile == null) {
                flowFile = session.create();
                flowFilePerSender.put(sender, flowFile);
            }
            if (shouldParse) {
                boolean valid = true;
                try {
                    event = parser.parseEvent(rawSyslogEvent.getRawMessage(), sender);
                }
                catch (ProcessException pe) {
                    this.getLogger().warn("Failed to parse Syslog event; routing to invalid");
                    valid = false;
                }
                if (!valid || !event.isValid()) {
                    FlowFile invalidFlowFile = session.create();
                    invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes);
                    if (sender != null) {
                        invalidFlowFile = session.putAttribute(invalidFlowFile, AbstractSyslogProcessor.SyslogAttributes.SENDER.key(), sender);
                    }
                    try {
                        final byte[] rawBytes = rawSyslogEvent.getRawMessage();
                        invalidFlowFile = session.write(invalidFlowFile, new OutputStreamCallback(){

                            public void process(OutputStream out) throws IOException {
                                out.write(rawBytes);
                            }
                        });
                    }
                    catch (Exception e) {
                        this.getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", (Throwable)e);
                        this.errorEvents.offer(rawSyslogEvent);
                        session.remove(invalidFlowFile);
                        break;
                    }
                    session.transfer(invalidFlowFile, REL_INVALID);
                    break;
                }
                this.getLogger().trace(event.getFullMessage());
                HashMap<String, String> attributes = new HashMap<String, String>(numAttributes);
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.PRIORITY.key(), event.getPriority());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.SEVERITY.key(), event.getSeverity());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.FACILITY.key(), event.getFacility());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.VERSION.key(), event.getVersion());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.HOSTNAME.key(), event.getHostName());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.BODY.key(), event.getMsgBody());
                attributes.put(AbstractSyslogProcessor.SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
                flowFile = session.putAllAttributes(flowFile, attributes);
            }
            final boolean writeDemarcator = i > 0;
            try {
                final byte[] rawMessage = event == null ? rawSyslogEvent.getRawMessage() : event.getRawMessage();
                flowFile = session.append(flowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        if (writeDemarcator) {
                            out.write(ListenSyslog.this.messageDemarcatorBytes);
                        }
                        out.write(rawMessage);
                    }
                });
            }
            catch (Exception e) {
                this.getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", (Throwable)e);
                this.errorEvents.offer(rawSyslogEvent);
                break;
            }
            session.adjustCounter("Messages Received", 1L, false);
            flowFilePerSender.put(sender, flowFile);
        }
        for (Map.Entry entry : flowFilePerSender.entrySet()) {
            sender = (String)entry.getKey();
            flowFile = (FlowFile)entry.getValue();
            if (flowFile.getSize() == 0L) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from Sender {}; removing FlowFile", new Object[]{sender});
                continue;
            }
            HashMap<String, String> newAttributes = new HashMap<String, String>(defaultAttributes.size() + 1);
            newAttributes.putAll(defaultAttributes);
            newAttributes.put(AbstractSyslogProcessor.SyslogAttributes.SENDER.key(), sender);
            flowFile = session.putAllAttributes(flowFile, newAttributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
            String transitUri = protocol.toLowerCase() + "://" + senderHost + ":" + port;
            session.getProvenanceReporter().receive(flowFile, transitUri);
        }
    }

    static void logMaxBufferWarning(ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
        logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + "maximum receive buffer");
    }

    private static class SocketChannelAttachment {
        private final ByteBuffer byteBuffer;
        private final SSLEngine sslEngine;

        public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine sslEngine) {
            this.byteBuffer = byteBuffer;
            this.sslEngine = sslEngine;
        }

        public ByteBuffer getByteBuffer() {
            return this.byteBuffer;
        }

        public SSLEngine getSslEngine() {
            return this.sslEngine;
        }
    }

    static class RawSyslogEvent {
        final byte[] rawMessage;
        final String sender;

        public RawSyslogEvent(byte[] rawMessage, String sender) {
            this.rawMessage = rawMessage;
            this.sender = sender;
        }

        public byte[] getRawMessage() {
            return this.rawMessage;
        }

        public String getSender() {
            return this.sender;
        }
    }

    private static class SSLSocketChannelHandler
    implements Runnable {
        private final SelectionKey key;
        private final SocketChannelDispatcher dispatcher;
        private final BlockingQueue<RawSyslogEvent> syslogEvents;
        private final ProcessorLog logger;
        private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);

        public SSLSocketChannelHandler(SelectionKey key, SocketChannelDispatcher dispatcher, BlockingQueue<RawSyslogEvent> syslogEvents, ProcessorLog logger) {
            this.key = key;
            this.dispatcher = dispatcher;
            this.syslogEvents = syslogEvents;
            this.logger = logger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        public void run() {
            block13: {
                block12: {
                    boolean eof = false;
                    SSLSocketChannel sslSocketChannel = null;
                    try {
                        int bytesRead;
                        SocketChannel socketChannel = (SocketChannel)this.key.channel();
                        SocketChannelAttachment attachment = (SocketChannelAttachment)this.key.attachment();
                        sslSocketChannel = new SSLSocketChannel(attachment.getSslEngine(), socketChannel, false);
                        ByteBuffer socketBuffer = attachment.getByteBuffer();
                        byte[] socketBufferArray = new byte[socketBuffer.limit()];
                        while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) {
                            for (int i = 0; i < bytesRead; ++i) {
                                byte currByte = socketBufferArray[i];
                                this.currBytes.write((int)currByte);
                                if (currByte != 10) continue;
                                String sender = socketChannel.socket().getInetAddress().toString();
                                this.syslogEvents.put(new RawSyslogEvent(this.currBytes.toByteArray(), sender));
                                this.currBytes.reset();
                            }
                            this.logger.debug("done handling SocketChannel");
                        }
                        if (bytesRead < 0) {
                            eof = true;
                        }
                        if (!eof) break block12;
                    }
                    catch (InterruptedException | ClosedByInterruptException e) {
                        this.logger.debug("read loop interrupted, closing connection");
                        eof = true;
                        break block13;
                    }
                    catch (IOException e2) {
                        block14: {
                            this.logger.error("Error reading from channel due to {}", new Object[]{e2.getMessage()}, (Throwable)e2);
                            eof = true;
                            if (!eof) break block14;
                            {
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                            }
                            IOUtils.closeQuietly((Closeable)sslSocketChannel);
                            this.dispatcher.completeConnection(this.key);
                            break block13;
                        }
                        this.dispatcher.addBackForSelection(this.key);
                        break block13;
                    }
                    finally {
                        if (eof) {
                            IOUtils.closeQuietly(sslSocketChannel);
                            this.dispatcher.completeConnection(this.key);
                        } else {
                            this.dispatcher.addBackForSelection(this.key);
                        }
                    }
                    IOUtils.closeQuietly((Closeable)sslSocketChannel);
                    this.dispatcher.completeConnection(this.key);
                    break block13;
                }
                this.dispatcher.addBackForSelection(this.key);
            }
        }
    }

    private static class SocketChannelHandler
    implements Runnable {
        private final SelectionKey key;
        private final SocketChannelDispatcher dispatcher;
        private final BlockingQueue<RawSyslogEvent> syslogEvents;
        private final ProcessorLog logger;
        private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);

        public SocketChannelHandler(SelectionKey key, SocketChannelDispatcher dispatcher, BlockingQueue<RawSyslogEvent> syslogEvents, ProcessorLog logger) {
            this.key = key;
            this.dispatcher = dispatcher;
            this.syslogEvents = syslogEvents;
            this.logger = logger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean eof = false;
            SocketChannel socketChannel = null;
            try {
                int bytesRead;
                socketChannel = (SocketChannel)this.key.channel();
                SocketChannelAttachment attachment = (SocketChannelAttachment)this.key.attachment();
                ByteBuffer socketBuffer = attachment.getByteBuffer();
                while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
                    socketBuffer.flip();
                    socketBuffer.mark();
                    int total = socketBuffer.remaining();
                    this.currBytes.reset();
                    for (int i = 0; i < total; ++i) {
                        byte currByte = socketBuffer.get();
                        this.currBytes.write((int)currByte);
                        if (currByte != 10) continue;
                        String sender = socketChannel.socket().getInetAddress().toString();
                        this.syslogEvents.put(new RawSyslogEvent(this.currBytes.toByteArray(), sender));
                        this.currBytes.reset();
                        socketBuffer.mark();
                    }
                    socketBuffer.reset();
                    socketBuffer.compact();
                    this.logger.debug("done handling SocketChannel");
                }
                if (bytesRead < 0) {
                    eof = true;
                }
            }
            catch (InterruptedException | ClosedByInterruptException e) {
                this.logger.debug("read loop interrupted, closing connection");
                eof = true;
            }
            catch (IOException e) {
                this.logger.error("Error reading from channel due to {}", new Object[]{e.getMessage()}, (Throwable)e);
                eof = true;
            }
            finally {
                if (eof) {
                    IOUtils.closeQuietly((Closeable)socketChannel);
                    this.dispatcher.completeConnection(this.key);
                } else {
                    this.dispatcher.addBackForSelection(this.key);
                }
            }
        }
    }

    private static class SocketChannelDispatcher
    implements ChannelReader {
        private final BlockingQueue<ByteBuffer> bufferPool;
        private final BlockingQueue<RawSyslogEvent> syslogEvents;
        private final ProcessorLog logger;
        private final ExecutorService executor;
        private volatile boolean stopped = false;
        private Selector selector;
        private final BlockingQueue<SelectionKey> keyQueue;
        private final int maxConnections;
        private final AtomicInteger currentConnections = new AtomicInteger(0);
        private final SSLContext sslContext;

        public SocketChannelDispatcher(BlockingQueue<ByteBuffer> bufferPool, BlockingQueue<RawSyslogEvent> syslogEvents, ProcessorLog logger, int maxConnections, SSLContext sslContext) {
            this.bufferPool = bufferPool;
            this.syslogEvents = syslogEvents;
            this.logger = logger;
            this.maxConnections = maxConnections;
            this.keyQueue = new LinkedBlockingQueue<SelectionKey>(maxConnections);
            this.sslContext = sslContext;
            this.executor = Executors.newFixedThreadPool(maxConnections);
        }

        @Override
        public void open(int port, int maxBufferSize) throws IOException {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            if (maxBufferSize > 0) {
                serverSocketChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)maxBufferSize);
                int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
                if (actualReceiveBufSize < maxBufferSize) {
                    ListenSyslog.logMaxBufferWarning(this.logger, maxBufferSize, actualReceiveBufSize);
                }
            }
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            this.selector = Selector.open();
            serverSocketChannel.register(this.selector, 16);
        }

        @Override
        public void run() {
            while (!this.stopped) {
                try {
                    SelectionKey key;
                    int selected = this.selector.select();
                    if (selected > 0) {
                        Iterator<SelectionKey> selectorKeys = this.selector.selectedKeys().iterator();
                        while (selectorKeys.hasNext()) {
                            SelectionKey key2 = selectorKeys.next();
                            selectorKeys.remove();
                            if (!key2.isValid()) continue;
                            if (key2.isAcceptable()) {
                                ServerSocketChannel channel = (ServerSocketChannel)key2.channel();
                                SocketChannel socketChannel = channel.accept();
                                if (this.currentConnections.incrementAndGet() > this.maxConnections) {
                                    this.currentConnections.decrementAndGet();
                                    this.logger.warn("Rejecting connection from {} because max connections has been met", new Object[]{socketChannel.getRemoteAddress().toString()});
                                    IOUtils.closeQuietly((Closeable)socketChannel);
                                    continue;
                                }
                                this.logger.debug("Accepted incoming connection from {}", new Object[]{socketChannel.getRemoteAddress().toString()});
                                socketChannel.configureBlocking(false);
                                SelectionKey readKey = socketChannel.register(this.selector, 1);
                                ByteBuffer buffer = (ByteBuffer)this.bufferPool.poll();
                                buffer.clear();
                                buffer.mark();
                                SSLEngine sslEngine = null;
                                if (this.sslContext != null) {
                                    sslEngine = this.sslContext.createSSLEngine();
                                }
                                SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslEngine);
                                readKey.attach(attachment);
                                continue;
                            }
                            if (!key2.isReadable()) continue;
                            key2.interestOps(0);
                            Runnable handler = this.sslContext != null ? new SSLSocketChannelHandler(key2, this, this.syslogEvents, this.logger) : new SocketChannelHandler(key2, this, this.syslogEvents, this.logger);
                            this.executor.execute(handler);
                        }
                    }
                    while ((key = (SelectionKey)this.keyQueue.poll()) != null) {
                        key.interestOps(1);
                    }
                }
                catch (IOException e) {
                    this.logger.error("Error accepting connection from SocketChannel", (Throwable)e);
                }
            }
        }

        @Override
        public int getPort() {
            for (SelectionKey key : this.selector.keys()) {
                SelectableChannel channel;
                if (!key.isValid() || !((channel = key.channel()) instanceof ServerSocketChannel)) continue;
                return ((ServerSocketChannel)channel).socket().getLocalPort();
            }
            return 0;
        }

        @Override
        public void stop() {
            this.stopped = true;
            this.selector.wakeup();
        }

        @Override
        public void close() {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                }
            }
            catch (InterruptedException ie) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            for (SelectionKey key : this.selector.keys()) {
                IOUtils.closeQuietly((Closeable)key.channel());
            }
            IOUtils.closeQuietly((Selector)this.selector);
        }

        public void completeConnection(SelectionKey key) {
            try {
                SocketChannelAttachment attachment = (SocketChannelAttachment)key.attachment();
                this.bufferPool.put(attachment.getByteBuffer());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.currentConnections.decrementAndGet();
        }

        public void addBackForSelection(SelectionKey key) {
            this.keyQueue.offer(key);
            this.selector.wakeup();
        }
    }

    private static class DatagramChannelReader
    implements ChannelReader {
        private final BlockingQueue<ByteBuffer> bufferPool;
        private final BlockingQueue<RawSyslogEvent> syslogEvents;
        private final ProcessorLog logger;
        private DatagramChannel datagramChannel;
        private volatile boolean stopped = false;
        private Selector selector;

        public DatagramChannelReader(BlockingQueue<ByteBuffer> bufferPool, BlockingQueue<RawSyslogEvent> syslogEvents, ProcessorLog logger) {
            this.bufferPool = bufferPool;
            this.syslogEvents = syslogEvents;
            this.logger = logger;
        }

        @Override
        public void open(int port, int maxBufferSize) throws IOException {
            this.datagramChannel = DatagramChannel.open();
            this.datagramChannel.configureBlocking(false);
            if (maxBufferSize > 0) {
                this.datagramChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)maxBufferSize);
                int actualReceiveBufSize = this.datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
                if (actualReceiveBufSize < maxBufferSize) {
                    ListenSyslog.logMaxBufferWarning(this.logger, maxBufferSize, actualReceiveBufSize);
                }
            }
            this.datagramChannel.socket().bind(new InetSocketAddress(port));
            this.selector = Selector.open();
            this.datagramChannel.register(this.selector, 1);
        }

        @Override
        public void run() {
            ByteBuffer buffer = (ByteBuffer)this.bufferPool.poll();
            while (!this.stopped) {
                try {
                    int selected = this.selector.select();
                    if (selected <= 0) continue;
                    Iterator<SelectionKey> selectorKeys = this.selector.selectedKeys().iterator();
                    while (selectorKeys.hasNext()) {
                        SocketAddress socketAddress;
                        SelectionKey key = selectorKeys.next();
                        selectorKeys.remove();
                        if (!key.isValid()) continue;
                        DatagramChannel channel = (DatagramChannel)key.channel();
                        buffer.clear();
                        while (!this.stopped && (socketAddress = channel.receive(buffer)) != null) {
                            String sender = "";
                            if (socketAddress instanceof InetSocketAddress) {
                                sender = ((InetSocketAddress)socketAddress).getAddress().toString();
                            }
                            buffer.flip();
                            byte[] bytes = new byte[buffer.limit()];
                            buffer.get(bytes, 0, buffer.limit());
                            this.syslogEvents.put(new RawSyslogEvent(bytes, sender));
                            buffer.clear();
                        }
                    }
                }
                catch (InterruptedException e) {
                    this.stopped = true;
                }
                catch (IOException e) {
                    this.logger.error("Error reading from DatagramChannel", (Throwable)e);
                }
            }
            if (buffer != null) {
                try {
                    this.bufferPool.put(buffer);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }

        @Override
        public int getPort() {
            return this.datagramChannel == null ? 0 : this.datagramChannel.socket().getLocalPort();
        }

        @Override
        public void stop() {
            this.selector.wakeup();
            this.stopped = true;
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((Selector)this.selector);
            IOUtils.closeQuietly((Closeable)this.datagramChannel);
        }
    }

    private static interface ChannelReader
    extends Runnable {
        public void open(int var1, int var2) throws IOException;

        public int getPort();

        public void stop();

        public void close();
    }
}

