package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;

@CapabilityDescription("Listens for Datagram Packets on a given port and reads the content of each datagram using the configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor can be restricted to listening for datagrams from  a specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SupportsBatching
@Tags({"ingest", "udp", "listen", "source", "record"})
@WritesAttributes({@WritesAttribute(attribute = "udp.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "udp.port", description = "The sending port the messages were received."), @WritesAttribute(attribute = ListenUDPRecord.RECORD_COUNT_ATTR, description = "The number of records written to the flow file."), @WritesAttribute(attribute = "mime.type", description = "The mime-type of the writer used to write the records to the flow file.")})
/* loaded from: input_file:org/apache/nifi/processors/standard/ListenUDPRecord.class */
public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent> {
    public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder().name("sending-host").displayName("Sending Host").description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(new HostValidator()).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder().name("sending-host-port").displayName("Sending Host Port").description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for reading the content of incoming datagrams.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(false).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before writing to a flow file.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(false).required(true).build();
    public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder().name("poll-timeout").displayName("Poll Timeout").description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting for the configured timeout, then the processor will emit whatever records have been obtained up to that point.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("50 ms").required(true).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("batch-size").displayName("Batch Size").description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when data is coming in more frequently than the Poll Timeout.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).defaultValue("1000").required(true).build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a datagram cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    public static final String UDP_PORT_ATTR = "udp.port";
    public static final String UDP_SENDER_ATTR = "udp.sender";
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile long pollTimeout;

    /* loaded from: input_file:org/apache/nifi/processors/standard/ListenUDPRecord$FlowFileRecordWriter.class */
    private static class FlowFileRecordWriter {
        private final FlowFile flowFile;
        private final RecordSetWriter recordWriter;

        public FlowFileRecordWriter(FlowFile flowFile, RecordSetWriter recordSetWriter) {
            this.flowFile = flowFile;
            this.recordWriter = recordSetWriter;
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public RecordSetWriter getRecordWriter() {
            return this.recordWriter;
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/ListenUDPRecord$HostValidator.class */
    private static class HostValidator implements Validator {
        private HostValidator() {
        }

        public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
            try {
                InetAddress.getByName(str2);
                return new ValidationResult.Builder().subject(str).valid(true).input(str2).build();
            } catch (UnknownHostException e) {
                return new ValidationResult.Builder().subject(str).valid(false).input(str2).explanation("Unknown host: " + e).build();
            }
        }
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(POLL_TIMEOUT, BATCH_SIZE, RECORD_READER, RECORD_WRITER, SENDING_HOST, SENDING_HOST_PORT);
    }

    protected List<Relationship> getAdditionalRelationships() {
        return Arrays.asList(REL_PARSE_FAILURE);
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        super.onScheduled(processContext);
        this.pollTimeout = processContext.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
    }

    protected long getLongPollTimeout() {
        return this.pollTimeout;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        String value = validationContext.getProperty(SENDING_HOST).getValue();
        String value2 = validationContext.getProperty(SENDING_HOST_PORT).getValue();
        if (StringUtils.isBlank(value) && StringUtils.isNotBlank(value2)) {
            arrayList.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false).explanation("Must specify Sending Host when specifying Sending Host Port").build());
        } else if (StringUtils.isBlank(value2) && StringUtils.isNotBlank(value)) {
            arrayList.add(new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
        }
        return arrayList;
    }

    protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<StandardEvent> blockingQueue) throws IOException {
        String value = processContext.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
        Integer asInteger = processContext.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
        return new DatagramChannelDispatcher(new StandardEventFactory(), createBufferPool(processContext.getMaxConcurrentTasks(), Integer.valueOf(processContext.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue()).intValue()), blockingQueue, getLogger(), value, asInteger);
    }

    /* JADX WARN: Removed duplicated region for block: B:44:0x01e3  */
    /* JADX WARN: Removed duplicated region for block: B:47:0x01ed  */
    /* JADX WARN: Removed duplicated region for block: B:52:0x0296  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void onTrigger(org.apache.nifi.processor.ProcessContext r6, org.apache.nifi.processor.ProcessSession r7) throws org.apache.nifi.processor.exception.ProcessException {
        /*
            Method dump skipped, instructions count: 938
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.nifi.processors.standard.ListenUDPRecord.onTrigger(org.apache.nifi.processor.ProcessContext, org.apache.nifi.processor.ProcessSession):void");
    }

    private void handleParseFailure(StandardEvent standardEvent, ProcessSession processSession, Exception exc) {
        handleParseFailure(standardEvent, processSession, exc, "Failed to parse datagram using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship");
    }

    private void handleParseFailure(StandardEvent standardEvent, ProcessSession processSession, Exception exc, String str) {
        FlowFile putAllAttributes = processSession.putAllAttributes(processSession.write(processSession.create(), outputStream -> {
            outputStream.write(standardEvent.getData());
        }), getAttributes(standardEvent.getSender()));
        processSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(standardEvent.getSender()));
        processSession.transfer(putAllAttributes, REL_PARSE_FAILURE);
        if (exc == null) {
            getLogger().error(str);
        } else {
            getLogger().error(str, exc);
        }
        processSession.adjustCounter("Parse Failures", 1L, false);
    }

    private Map<String, String> getAttributes(String str) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("udp.sender", str);
        hashMap.put("udp.port", String.valueOf(this.port));
        return hashMap;
    }

    private String getTransitUri(String str) {
        return "udp://" + ((!str.startsWith("/") || str.length() <= 1) ? str : str.substring(1)) + ":" + this.port;
    }
}
