/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;

@SupportsBatching
@Tags(value={"ingest", "udp", "listen", "source"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription(value="Listens for Datagram Packets on a given port. The default behavior produces a FlowFile per datagram, however for higher throughput the Max Batch Size property may be increased to specify the number of datagrams to batch together in a single FlowFile. This processor can be restricted to listening for datagrams from  a specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
@WritesAttributes(value={@WritesAttribute(attribute="udp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="udp.port", description="The sending port the messages were received.")})
public class ListenUDP
extends AbstractListenEventBatchingProcessor<StandardEvent> {
    public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder().name("Sending Host").description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator((Validator)new HostValidator()).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder().name("Sending Host Port").description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(true).build();
    public static final String UDP_PORT_ATTR = "udp.port";
    public static final String UDP_SENDER_ATTR = "udp.sender";

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(SENDING_HOST, SENDING_HOST_PORT);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> result = new ArrayList<ValidationResult>();
        String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
        String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
        if (StringUtils.isBlank((CharSequence)sendingHost) && StringUtils.isNotBlank((CharSequence)sendingPort)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false).explanation("Must specify Sending Host when specifying Sending Host Port").build());
        } else if (StringUtils.isBlank((CharSequence)sendingPort) && StringUtils.isNotBlank((CharSequence)sendingHost)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
        }
        return result;
    }

    protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
        String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
        Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
        Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        BlockingQueue bufferPool = this.createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
        StandardEventFactory eventFactory = new StandardEventFactory();
        return new DatagramChannelDispatcher((EventFactory)eventFactory, bufferPool, events, this.getLogger(), sendingHost, sendingHostPort);
    }

    protected Map<String, String> getAttributes(AbstractListenEventBatchingProcessor.FlowFileEventBatch batch) {
        String sender = ((StandardEvent)batch.getEvents().get(0)).getSender();
        HashMap<String, String> attributes = new HashMap<String, String>(3);
        attributes.put(UDP_SENDER_ATTR, sender);
        attributes.put(UDP_PORT_ATTR, String.valueOf(this.port));
        return attributes;
    }

    protected String getTransitUri(AbstractListenEventBatchingProcessor.FlowFileEventBatch batch) {
        String sender = ((StandardEvent)batch.getEvents().get(0)).getSender();
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        String transitUri = "udp" + "://" + senderHost + ":" + this.port;
        return transitUri;
    }

    public static class HostValidator
    implements Validator {
        public ValidationResult validate(String subject, String input, ValidationContext context) {
            try {
                InetAddress.getByName(input);
                return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
            }
            catch (UnknownHostException e) {
                return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
            }
        }
    }
}

