/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.AbstractSyslogProcessor;
import org.apache.nifi.processors.standard.ListenSyslog;
import org.apache.nifi.processors.standard.ParseSyslog;
import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerWhenEmpty
@Tags(value={"syslog", "put", "udp", "tcp", "logs"})
@CapabilityDescription(value="Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional.  The constructed messages are checked against regular expressions for RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, failures routed to the failure relationship.")
@SeeAlso(value={ListenSyslog.class, ParseSyslog.class})
public class PutSyslog
extends AbstractSyslogProcessor {
    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("Hostname").description("The ip address or hostname of the Syslog server.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("localhost").required(true).build();
    public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Send Buffer Size").description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("2048 B").required(true).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The number of incoming FlowFiles to process in a single execution of this processor.").required(true).defaultValue("25").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor.Builder().name("Idle Connection Expiration").description("The amount of time a connection should be held open without being used before closing the connection.").required(true).defaultValue("5 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor.Builder().name("Message Priority").description("The priority for the Syslog messages, excluding < >.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor.Builder().name("Message Version").description("The version for the Syslog messages.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor.Builder().name("Message Timestamp").description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\".").required(true).defaultValue("${now():format('MMM d HH:mm:ss')}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor.Builder().name("Message Hostname").description("The hostname for the Syslog messages.").required(true).defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor.Builder().name("Message Body").description("The body for the Syslog messages.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog messages will be sent over a secure connection.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are sent successfully to Syslog are sent out this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to Syslog are sent out this relationship.").build();
    public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("FlowFiles that do not form a valid Syslog message are sent out this relationship.").build();
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    private volatile BlockingQueue<ByteBuffer> bufferPool;
    private volatile BlockingQueue<ChannelSender> senderPool;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(HOSTNAME);
        descriptors.add(PROTOCOL);
        descriptors.add(PORT);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(IDLE_EXPIRATION);
        descriptors.add(SEND_BUFFER_SIZE);
        descriptors.add(BATCH_SIZE);
        descriptors.add(CHARSET);
        descriptors.add(MSG_PRIORITY);
        descriptors.add(MSG_VERSION);
        descriptors.add(MSG_TIMESTAMP);
        descriptors.add(MSG_HOSTNAME);
        descriptors.add(MSG_BODY);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_INVALID);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String protocol = context.getProperty(PROTOCOL).getValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
            results.add(new ValidationResult.Builder().explanation("SSL can not be used with UDP").valid(false).subject("SSL Context").build());
        }
        return results;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        this.bufferPool = new LinkedBlockingQueue<ByteBuffer>(context.getMaxConcurrentTasks());
        for (int i = 0; i < context.getMaxConcurrentTasks(); ++i) {
            this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
        }
        this.senderPool = new LinkedBlockingQueue<ChannelSender>(context.getMaxConcurrentTasks());
    }

    protected ChannelSender createSender(ProcessContext context, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
        int port = context.getProperty(PORT).asInteger();
        String host = context.getProperty(HOSTNAME).getValue();
        String protocol = context.getProperty(PROTOCOL).getValue();
        String charSet = context.getProperty(CHARSET).getValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        return this.createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool);
    }

    protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
        if (protocol.equals(UDP_VALUE.getValue())) {
            return new DatagramChannelSender(host, port, bufferPool, charset);
        }
        if (sslContextService != null) {
            SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
            return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset);
        }
        return new SocketChannelSender(host, port, bufferPool, charset);
    }

    @OnStopped
    public void onStopped() {
        if (this.senderPool != null) {
            ChannelSender sender = (ChannelSender)this.senderPool.poll();
            while (sender != null) {
                sender.close();
                sender = (ChannelSender)this.senderPool.poll();
            }
        }
    }

    private void pruneIdleSenders(long idleThreshold) {
        ChannelSender sender;
        long currentTime = System.currentTimeMillis();
        ArrayList<ChannelSender> putBack = new ArrayList<ChannelSender>();
        while ((sender = (ChannelSender)this.senderPool.poll()) != null) {
            if (currentTime > sender.lastUsed + idleThreshold) {
                this.getLogger().debug("Closing idle connection...");
                sender.close();
                continue;
            }
            putBack.add(sender);
        }
        for (ChannelSender putBackSender : putBack) {
            boolean returned = this.senderPool.offer(putBackSender);
            if (returned) continue;
            putBackSender.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String protocol = context.getProperty(PROTOCOL).getValue();
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        List flowFiles = session.get(batchSize);
        if (flowFiles == null || flowFiles.isEmpty()) {
            this.pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS));
            context.yield();
            return;
        }
        ChannelSender sender = (ChannelSender)this.senderPool.poll();
        if (sender == null) {
            try {
                this.getLogger().debug("No available connections, creating a new one...");
                sender = this.createSender(context, this.bufferPool);
            }
            catch (IOException e) {
                for (FlowFile flowFile : flowFiles) {
                    this.getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", new Object[]{flowFile}, (Throwable)e);
                    session.transfer(flowFile, REL_FAILURE);
                }
                context.yield();
                return;
            }
        }
        String port = context.getProperty(PORT).getValue();
        String host = context.getProperty(HOSTNAME).getValue();
        String transitUri = protocol + "://" + host + ":" + port;
        ObjectHolder exceptionHolder = new ObjectHolder(null);
        try {
            for (FlowFile flowFile : flowFiles) {
                StopWatch timer = new StopWatch(true);
                String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
                String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
                String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
                String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
                String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
                StringBuilder messageBuilder = new StringBuilder();
                messageBuilder.append("<").append(priority).append(">");
                if (version != null) {
                    messageBuilder.append(version).append(" ");
                }
                messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body);
                String fullMessage = messageBuilder.toString();
                this.getLogger().debug(fullMessage);
                if (this.isValid(fullMessage)) {
                    try {
                        if (protocol.equals(TCP_VALUE.getValue())) {
                            messageBuilder.append('\n');
                        }
                        sender.send(messageBuilder.toString());
                        timer.stop();
                        long duration = timer.getDuration(TimeUnit.MILLISECONDS);
                        session.getProvenanceReporter().send(flowFile, transitUri, duration, true);
                        this.getLogger().info("Transferring {} to success", new Object[]{flowFile});
                        session.transfer(flowFile, REL_SUCCESS);
                    }
                    catch (IOException e) {
                        this.getLogger().error("Transferring {} to failure", new Object[]{flowFile}, (Throwable)e);
                        session.transfer(flowFile, REL_FAILURE);
                        exceptionHolder.set((Object)e);
                    }
                    continue;
                }
                this.getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
                session.transfer(flowFile, REL_INVALID);
            }
        }
        finally {
            if (sender.isConnected() && exceptionHolder.get() == null) {
                boolean returned = this.senderPool.offer(sender);
                if (!returned) {
                    sender.close();
                }
            } else {
                sender.close();
            }
        }
    }

    private boolean isValid(String message) {
        for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
            Matcher matcher = pattern.matcher(message);
            if (!matcher.matches()) continue;
            return true;
        }
        return false;
    }

    private static class SSLSocketChannelSender
    extends ChannelSender {
        final SSLSocketChannel channel;

        SSLSocketChannelSender(SSLContext sslContext, String host, int port, BlockingQueue<ByteBuffer> bufferPool, Charset charset) throws IOException {
            super(host, port, bufferPool, charset);
            this.channel = new SSLSocketChannel(sslContext, host, port, true);
            this.channel.connect();
        }

        @Override
        public void send(String message) throws IOException {
            byte[] bytes = message.getBytes(this.charset);
            this.channel.write(bytes);
            this.lastUsed = System.currentTimeMillis();
        }

        @Override
        public void write(ByteBuffer buffer) throws IOException {
        }

        @Override
        boolean isConnected() {
            return this.channel != null && !this.channel.isClosed();
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((Closeable)this.channel);
        }
    }

    private static class SocketChannelSender
    extends ChannelSender {
        final SocketChannel channel = SocketChannel.open();

        SocketChannelSender(String host, int port, BlockingQueue<ByteBuffer> bufferPool, Charset charset) throws IOException {
            super(host, port, bufferPool, charset);
            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
        }

        @Override
        public void write(ByteBuffer buffer) throws IOException {
            while (buffer.hasRemaining()) {
                this.channel.write(buffer);
            }
        }

        @Override
        boolean isConnected() {
            return this.channel != null && this.channel.isConnected();
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((Closeable)this.channel);
        }
    }

    private static class DatagramChannelSender
    extends ChannelSender {
        final DatagramChannel channel = DatagramChannel.open();

        DatagramChannelSender(String host, int port, BlockingQueue<ByteBuffer> bufferPool, Charset charset) throws IOException {
            super(host, port, bufferPool, charset);
            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
        }

        @Override
        public void write(ByteBuffer buffer) throws IOException {
            while (buffer.hasRemaining()) {
                this.channel.write(buffer);
            }
        }

        @Override
        boolean isConnected() {
            return this.channel != null && this.channel.isConnected();
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((Closeable)this.channel);
        }
    }

    protected static abstract class ChannelSender {
        final int port;
        final String host;
        final BlockingQueue<ByteBuffer> bufferPool;
        final Charset charset;
        volatile long lastUsed;

        ChannelSender(String host, int port, BlockingQueue<ByteBuffer> bufferPool, Charset charset) throws IOException {
            this.port = port;
            this.host = host;
            this.bufferPool = bufferPool;
            this.charset = charset;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void send(String message) throws IOException {
            byte[] bytes = message.getBytes(this.charset);
            boolean shouldReturn = true;
            ByteBuffer buffer = (ByteBuffer)this.bufferPool.poll();
            if (buffer == null) {
                buffer = ByteBuffer.allocate(bytes.length);
                shouldReturn = false;
            } else if (buffer.limit() < bytes.length) {
                this.bufferPool.offer(buffer);
                buffer = ByteBuffer.allocate(bytes.length);
                shouldReturn = false;
            }
            try {
                buffer.clear();
                buffer.put(bytes);
                buffer.flip();
                this.write(buffer);
                this.lastUsed = System.currentTimeMillis();
            }
            finally {
                if (shouldReturn) {
                    this.bufferPool.offer(buffer);
                }
            }
        }

        abstract void write(ByteBuffer var1) throws IOException;

        abstract boolean isConnected();

        abstract void close();
    }
}

