/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.splunk;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"splunk", "logs", "tcp", "udp"})
@TriggerWhenEmpty
@CapabilityDescription(value="Sends logs to Splunk Enterprise over TCP, TCP + TLS/SSL, or UDP. If a Message Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of the FlowFile will be sent directly to Splunk as if it were a single message.")
public class PutSplunk
extends AbstractPutEventProcessor {
    public static final char NEW_LINE_CHAR = '\n';

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(TIMEOUT, CHARSET, PROTOCOL, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String protocol = context.getProperty(PROTOCOL).getValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
            results.add(new ValidationResult.Builder().explanation("SSL can not be used with UDP").valid(false).subject("SSL Context").build());
        }
        return results;
    }

    @OnStopped
    public void cleanup() {
        AbstractPutEventProcessor.FlowFileMessageBatch batch;
        for (AbstractPutEventProcessor.FlowFileMessageBatch batch2 : this.activeBatches) {
            batch2.cancelOrComplete();
        }
        while ((batch = (AbstractPutEventProcessor.FlowFileMessageBatch)this.completeBatches.poll()) != null) {
            batch.completeSession();
        }
    }

    protected String createTransitUri(ProcessContext context) {
        String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
        String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
        return protocol + "://" + host + ":" + port;
    }

    protected ChannelSender createSender(ProcessContext context) throws IOException {
        int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
        String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        String protocol = context.getProperty(PROTOCOL).getValue();
        int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        SSLContext sslContext = null;
        if (sslContextService != null) {
            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
        }
        return this.createSender(protocol, host, port, timeout, maxSendBuffer, sslContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        AbstractPutEventProcessor.FlowFileMessageBatch batch;
        while ((batch = (AbstractPutEventProcessor.FlowFileMessageBatch)this.completeBatches.poll()) != null) {
            batch.completeSession();
        }
        ProcessSession session = sessionFactory.createSession();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            AbstractPutEventProcessor.PruneResult result = this.pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS));
            if (result.getNumClosed() > 0 || result.getNumClosed() == 0 && result.getNumConsidered() == 0) {
                context.yield();
            }
            return;
        }
        ChannelSender sender = this.acquireSender(context, session, flowFile);
        if (sender == null) {
            return;
        }
        try {
            String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
            if (delimiter != null) {
                delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
            }
            if (delimiter == null) {
                this.processSingleMessage(context, session, flowFile, sender);
            } else {
                this.processDelimitedMessages(context, session, flowFile, sender, delimiter);
            }
        }
        finally {
            this.relinquishSender(sender);
        }
    }

    private void processSingleMessage(ProcessContext context, ProcessSession session, FlowFile flowFile, ChannelSender sender) {
        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.copy((InputStream)in, (OutputStream)baos);
            }
        });
        String protocol = context.getProperty(PROTOCOL).getValue();
        byte[] buf = baos.toByteArray();
        if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != 10) {
            byte[] updatedBuf = new byte[buf.length + 1];
            System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
            updatedBuf[updatedBuf.length - 1] = 10;
            buf = updatedBuf;
        }
        AbstractPutEventProcessor.FlowFileMessageBatch messageBatch = new AbstractPutEventProcessor.FlowFileMessageBatch((AbstractPutEventProcessor)this, session, flowFile);
        messageBatch.setNumMessages(1L);
        this.activeBatches.add(messageBatch);
        try {
            sender.send(buf);
            messageBatch.addSuccessfulRange(0L, flowFile.getSize());
        }
        catch (IOException e) {
            messageBatch.addFailedRange(0L, flowFile.getSize(), (Exception)e);
            context.yield();
        }
    }

    private void processDelimitedMessages(ProcessContext context, ProcessSession session, FlowFile flowFile, final ChannelSender sender, String delimiter) {
        final String protocol = context.getProperty(PROTOCOL).getValue();
        final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
        final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
        final AtomicLong messagesSent = new AtomicLong(0L);
        final AbstractPutEventProcessor.FlowFileMessageBatch messageBatch = new AbstractPutEventProcessor.FlowFileMessageBatch((AbstractPutEventProcessor)this, session, flowFile);
        this.activeBatches.add(messageBatch);
        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            session.read(flowFile, new InputStreamCallback(){

                public void process(InputStream rawIn) throws IOException {
                    byte[] data = null;
                    boolean streamFinished = false;
                    try (BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
                         ByteCountingInputStream in = new ByteCountingInputStream((InputStream)bufferedIn);){
                        long messageStartOffset = in.getBytesConsumed();
                        while (!streamFinished) {
                            int nextByte = in.read();
                            if (nextByte > -1) {
                                baos.write(nextByte);
                            }
                            if (nextByte == -1) {
                                data = PutSplunk.this.getMessage(baos, baos.size(), protocol);
                                streamFinished = true;
                            } else if (buffer.addAndCompare((byte)nextByte)) {
                                data = PutSplunk.this.getMessage(baos, baos.size() - delimiterBytes.length, protocol);
                            }
                            if (data == null) continue;
                            long messageEndOffset = in.getBytesConsumed();
                            if (data.length != 0) {
                                long rangeStart = messageStartOffset;
                                try {
                                    sender.send(data);
                                    messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
                                    messagesSent.incrementAndGet();
                                }
                                catch (IOException e) {
                                    messageBatch.addFailedRange(rangeStart, messageEndOffset, (Exception)e);
                                }
                            }
                            baos.reset();
                            data = null;
                            messageStartOffset = in.getBytesConsumed();
                        }
                    }
                }
            });
            messageBatch.setNumMessages(messagesSent.get());
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private byte[] getMessage(ByteArrayOutputStream baos, int length, String protocol) {
        if (baos.size() == 0) {
            return null;
        }
        byte[] buf = baos.toByteArray();
        if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != 10) {
            byte[] message = new byte[length + 1];
            for (int i = 0; i < length; ++i) {
                message[i] = buf[i];
            }
            message[message.length - 1] = 10;
            return message;
        }
        return Arrays.copyOfRange(baos.toByteArray(), 0, length);
    }
}

