/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processors.standard.ListenTCP;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription(value="The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso(value={ListenTCP.class})
@Tags(value={"remote", "egress", "put", "tcp"})
@TriggerWhenEmpty
public class PutTCP
extends AbstractPutEventProcessor {
    protected ChannelSender createSender(ProcessContext context) throws IOException {
        String protocol = TCP_VALUE.getValue();
        String hostname = context.getProperty(HOSTNAME).getValue();
        int port = context.getProperty(PORT).asInteger();
        int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
        SSLContext sslContext = null;
        if (sslContextService != null) {
            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
        }
        return this.createSender(protocol, hostname, port, timeout, bufferSize, sslContext);
    }

    protected String createTransitUri(ProcessContext context) {
        String protocol = TCP_VALUE.getValue();
        String host = context.getProperty(HOSTNAME).getValue();
        String port = context.getProperty(PORT).getValue();
        return protocol + "://" + host + ":" + port;
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(CONNECTION_PER_FLOWFILE, OUTGOING_MESSAGE_DELIMITER, TIMEOUT, SSL_CONTEXT_SERVICE, CHARSET);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            this.pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS));
            context.yield();
            return;
        }
        ChannelSender sender = this.acquireSender(context, session, flowFile);
        if (sender == null) {
            return;
        }
        try {
            String outgoingMessageDelimiter = this.getOutgoingMessageDelimiter(context, flowFile);
            ByteArrayOutputStream content = this.readContent(session, flowFile);
            if (outgoingMessageDelimiter != null) {
                Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
                content = this.appendDelimiter(content, outgoingMessageDelimiter, charset);
            }
            StopWatch stopWatch = new StopWatch(true);
            sender.send(content.toByteArray());
            session.getProvenanceReporter().send(flowFile, this.transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
            session.commit();
        }
        catch (Exception e) {
            this.onFailure(context, session, flowFile);
            this.getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[]{flowFile}, (Throwable)e);
        }
        finally {
            if (!this.isConnectionPerFlowFile(context)) {
                this.relinquishSender(sender);
            } else {
                sender.close();
            }
        }
    }

    protected void onFailure(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        session.transfer(session.penalize(flowFile), REL_FAILURE);
        session.commit();
        context.yield();
    }

    protected ByteArrayOutputStream readContent(ProcessSession session, FlowFile flowFile) {
        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.copy((InputStream)in, (OutputStream)baos);
            }
        });
        return baos;
    }

    protected ByteArrayOutputStream appendDelimiter(ByteArrayOutputStream content, String delimiter, Charset charSet) {
        content.write(delimiter.getBytes(charSet), 0, delimiter.length());
        return content;
    }

    protected boolean isConnectionPerFlowFile(ProcessContext context) {
        return context.getProperty(CONNECTION_PER_FLOWFILE).getValue().equalsIgnoreCase("true");
    }
}

