/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processors.standard.ListenUDP;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription(value="The PutUDP processor receives a FlowFile and packages the FlowFile content into a single UDP datagram packet which is then transmitted to the configured UDP server. The user must ensure that the FlowFile content being fed to this processor is not larger than the maximum size for the underlying UDP transport. The maximum transport size will vary based on the platform setup but is generally just under 64KB. FlowFiles will be marked as failed if their content is larger than the maximum transport size.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso(value={ListenUDP.class})
@Tags(value={"remote", "egress", "put", "udp"})
@TriggerWhenEmpty
public class PutUDP
extends AbstractPutEventProcessor {
    protected String createTransitUri(ProcessContext context) {
        String protocol = UDP_VALUE.getValue();
        String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
        return protocol + "://" + host + ":" + port;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            byte[] content = this.readContent(session, flowFile);
            StopWatch stopWatch = new StopWatch(true);
            if (content != null) {
                this.eventSender.sendEvent((Object)content);
            }
            session.getProvenanceReporter().send(flowFile, this.transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
            session.commitAsync();
        }
        catch (Exception e) {
            this.getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[]{flowFile}, (Throwable)e);
            this.onFailure(context, session, flowFile);
        }
    }

    protected void onFailure(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        session.transfer(session.penalize(flowFile), REL_FAILURE);
        session.commitAsync();
        context.yield();
    }

    protected byte[] readContent(ProcessSession session, FlowFile flowFile) {
        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.copy((InputStream)in, (OutputStream)baos);
            }
        });
        return baos.toByteArray();
    }

    protected String getProtocol(ProcessContext context) {
        return UDP_VALUE.getValue();
    }
}

