package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.io.nio.ChannelListener;
import org.apache.nifi.io.nio.consumer.StreamConsumer;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
import org.apache.nifi.util.Tuple;

@CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets together generating flow files")
@TriggerWhenEmpty
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"ingest", "udp", "listen", "source"})
/* loaded from: input_file:org/apache/nifi/processors/standard/ListenUDP.class */
public class ListenUDP extends AbstractSessionFactoryProcessor {
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> properties;
    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success").description("Connection which contains concatenated Datagram Packets").build();
    public static final PropertyDescriptor PORT;
    public static final PropertyDescriptor RECV_TIMEOUT;
    public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM;
    public static final PropertyDescriptor MAX_BUFFER_SIZE;
    public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER;
    public static final PropertyDescriptor MAX_UDP_BUFFER;
    public static final PropertyDescriptor RECV_BUFFER_COUNT;
    public static final PropertyDescriptor CHANNEL_READER_PERIOD;
    public static final PropertyDescriptor FLOW_FILES_PER_SESSION;
    public static final PropertyDescriptor SENDING_HOST;
    public static final PropertyDescriptor SENDING_HOST_PORT;
    private static final Set<String> interfaceSet;
    public static final PropertyDescriptor NETWORK_INTF_NAME;
    public static final int DEFAULT_LISTENING_THREADS = 2;
    private final Lock lock = new ReentrantLock();
    private volatile ChannelListener channelListener = null;
    private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue();
    private final List<FlowFile> newFlowFiles = new ArrayList();
    private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference<>();
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>();
    private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor();
    private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference<>();
    private final AtomicBoolean resetChannelListener = new AtomicBoolean(false);
    private volatile String sendingHost;

    /* loaded from: input_file:org/apache/nifi/processors/standard/ListenUDP$HostValidator.class */
    public static class HostValidator implements Validator {
        public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
            try {
                InetAddress.getByName(str2);
                return new ValidationResult.Builder().subject(str).valid(true).input(str2).build();
            } catch (UnknownHostException e) {
                return new ValidationResult.Builder().subject(str).valid(false).input(str2).explanation("Unknown host: " + e).build();
            }
        }
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    @OnScheduled
    public void initializeChannelListenerAndConsumerProcessing(final ProcessContext processContext) throws IOException {
        getChannelListener(processContext);
        this.stopping.set(false);
        this.consumerFutureRef.set(this.consumerExecutorService.submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() { // from class: org.apache.nifi.processors.standard.ListenUDP.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Tuple<ProcessSession, List<FlowFile>> call() {
                int intValue = processContext.getProperty(ListenUDP.FLOW_FILES_PER_SESSION).asInteger().intValue();
                long longValue = processContext.getProperty(ListenUDP.CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
                int i = (int) (longValue <= 1000 ? 5000 / longValue : 1L);
                ProcessorLog logger = ListenUDP.this.getLogger();
                int i2 = intValue;
                ProcessSession processSession = null;
                int i3 = 0;
                while (!ListenUDP.this.stopping.get()) {
                    UDPStreamConsumer uDPStreamConsumer = (UDPStreamConsumer) ListenUDP.this.consumerRef.get();
                    if (uDPStreamConsumer == null || ListenUDP.this.sessionFactoryRef.get() == null) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        if (i2 == intValue || i3 == i) {
                            logger.debug("Have waited {} times", new Object[]{Integer.valueOf(i3)});
                            i3 = 0;
                            if (processSession != null) {
                                Tuple tuple = new Tuple(processSession, new ArrayList(ListenUDP.this.newFlowFiles));
                                ListenUDP.this.newFlowFiles.clear();
                                ListenUDP.this.flowFilesPerSessionQueue.add(tuple);
                            }
                            processSession = ((ProcessSessionFactory) ListenUDP.this.sessionFactoryRef.get()).createSession();
                            uDPStreamConsumer.setSession(processSession);
                            i2 = 0;
                        }
                        if (processContext.getAvailableRelationships().size() > 0) {
                            try {
                                try {
                                    uDPStreamConsumer.process();
                                    if (i2 == ListenUDP.this.newFlowFiles.size()) {
                                        Thread.sleep(longValue);
                                        if (i2 > 0) {
                                            i3++;
                                        }
                                    } else {
                                        i2 = ListenUDP.this.newFlowFiles.size();
                                    }
                                } catch (IOException e2) {
                                    logger.error("Unable to fully process consumer {}", new Object[]{uDPStreamConsumer}, e2);
                                    if (uDPStreamConsumer.isConsumerFinished()) {
                                        logger.info("Consumer {} was closed and is finished", new Object[]{uDPStreamConsumer});
                                        ListenUDP.this.consumerRef.set(null);
                                        ListenUDP.this.disconnect();
                                        if (!ListenUDP.this.stopping.get()) {
                                            ListenUDP.this.resetChannelListener.set(true);
                                        }
                                    }
                                } catch (InterruptedException e3) {
                                    if (uDPStreamConsumer.isConsumerFinished()) {
                                        logger.info("Consumer {} was closed and is finished", new Object[]{uDPStreamConsumer});
                                        ListenUDP.this.consumerRef.set(null);
                                        ListenUDP.this.disconnect();
                                        if (!ListenUDP.this.stopping.get()) {
                                            ListenUDP.this.resetChannelListener.set(true);
                                        }
                                    }
                                }
                            } catch (Throwable th) {
                                if (uDPStreamConsumer.isConsumerFinished()) {
                                    logger.info("Consumer {} was closed and is finished", new Object[]{uDPStreamConsumer});
                                    ListenUDP.this.consumerRef.set(null);
                                    ListenUDP.this.disconnect();
                                    if (!ListenUDP.this.stopping.get()) {
                                        ListenUDP.this.resetChannelListener.set(true);
                                    }
                                }
                                throw th;
                            }
                        } else {
                            logger.debug("Creating back pressure...no available destinations");
                            Thread.sleep(1000L);
                        }
                        if (uDPStreamConsumer.isConsumerFinished()) {
                            logger.info("Consumer {} was closed and is finished", new Object[]{uDPStreamConsumer});
                            ListenUDP.this.consumerRef.set(null);
                            ListenUDP.this.disconnect();
                            if (!ListenUDP.this.stopping.get()) {
                                ListenUDP.this.resetChannelListener.set(true);
                            }
                        }
                    }
                }
                while (true) {
                    UDPStreamConsumer uDPStreamConsumer2 = (UDPStreamConsumer) ListenUDP.this.consumerRef.get();
                    if (uDPStreamConsumer2 == null || uDPStreamConsumer2.isConsumerFinished()) {
                        break;
                    }
                    try {
                        uDPStreamConsumer2.process();
                    } catch (IOException e4) {
                    }
                }
                return new Tuple<>(processSession, new ArrayList(ListenUDP.this.newFlowFiles));
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnect() {
        if (this.lock.tryLock()) {
            try {
                if (this.channelListener != null) {
                    getLogger().debug("Shutting down channel listener {}", new Object[]{this.channelListener});
                    this.channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
                    this.channelListener = null;
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    private void getChannelListener(ProcessContext processContext) throws IOException {
        if (this.lock.tryLock()) {
            try {
                ProcessorLog logger = getLogger();
                logger.debug("Instantiating a new channel listener");
                int intValue = processContext.getProperty(PORT).asInteger().intValue();
                int intValue2 = processContext.getProperty(RECV_BUFFER_COUNT).asInteger().intValue();
                Double asDataSize = processContext.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
                Double asDataSize2 = processContext.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
                this.sendingHost = processContext.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
                Integer asInteger = processContext.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
                String value = processContext.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
                final Double asDataSize3 = processContext.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
                int intValue3 = processContext.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
                final boolean booleanValue = processContext.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean().booleanValue();
                StreamConsumerFactory streamConsumerFactory = new StreamConsumerFactory() { // from class: org.apache.nifi.processors.standard.ListenUDP.3
                    public StreamConsumer newInstance(String str) {
                        UDPStreamConsumer uDPStreamConsumer = new UDPStreamConsumer(str, ListenUDP.this.newFlowFiles, asDataSize3.intValue(), ListenUDP.this.getLogger(), booleanValue);
                        ListenUDP.this.consumerRef.set(uDPStreamConsumer);
                        return uDPStreamConsumer;
                    }
                };
                int intValue4 = processContext.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
                this.channelListener = new ChannelListener(2, streamConsumerFactory, new BufferPool(intValue2, asDataSize.intValue(), false, 2.147483647E9d), intValue3, TimeUnit.MILLISECONDS, booleanValue);
                this.channelListener.setChannelReaderSchedulingPeriod(intValue4, TimeUnit.MILLISECONDS);
                InetAddress inetAddress = null;
                if (null != value) {
                    inetAddress = NetworkInterface.getByName(value).getInetAddresses().nextElement();
                }
                this.channelListener.addDatagramChannel(inetAddress, intValue, asDataSize2.intValue(), this.sendingHost, asInteger);
                logger.info("Registered service and initialized UDP socket listener. Now listening on port " + intValue + "...");
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        String value = validationContext.getProperty(SENDING_HOST).getValue();
        String value2 = validationContext.getProperty(SENDING_HOST_PORT).getValue();
        if (StringUtils.isBlank(value) && StringUtils.isNotBlank(value2)) {
            arrayList.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false).explanation("Must specify Sending Host when specifying Sending Host Port").build());
        } else if (StringUtils.isBlank(value2) && StringUtils.isNotBlank(value)) {
            arrayList.add(new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        ProcessorLog logger = getLogger();
        this.sessionFactoryRef.compareAndSet(null, processSessionFactory);
        if (this.resetChannelListener.getAndSet(false) && !this.stopping.get()) {
            try {
                getChannelListener(processContext);
            } catch (IOException e) {
                logger.error("Tried to reset Channel Listener and failed due to:", e);
                this.resetChannelListener.set(true);
            }
        }
        transferFlowFiles();
    }

    private boolean transferFlowFiles() {
        ProcessorLog logger = getLogger();
        Tuple<ProcessSession, List<FlowFile>> tuple = null;
        boolean z = false;
        try {
            tuple = this.flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        if (tuple != null) {
            ProcessSession processSession = (ProcessSession) tuple.getKey();
            List<FlowFile> list = (List) tuple.getValue();
            String str = this.sendingHost == null ? "Unknown" : this.sendingHost;
            try {
                for (FlowFile flowFile : list) {
                    processSession.getProvenanceReporter().receive(flowFile, str);
                    processSession.transfer(flowFile, RELATIONSHIP_SUCCESS);
                }
                logger.info("Transferred flow files {} to success", new Object[]{list});
                z = true;
                for (FlowFile flowFile2 : processSession.get(10)) {
                    if (flowFile2 != null && flowFile2.getSize() > 0) {
                        processSession.transfer(flowFile2, RELATIONSHIP_SUCCESS);
                        logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success", new Object[]{flowFile2});
                    } else if (flowFile2 != null) {
                        processSession.remove(flowFile2);
                        logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{flowFile2});
                    }
                }
                processSession.commit();
            } catch (Throwable th) {
                processSession.rollback();
                logger.error("Failed to transfer flow files or commit session...rolled back", th);
                throw th;
            }
        }
        return z;
    }

    @OnUnscheduled
    public void stopping() {
        getLogger().debug("Stopping Processor");
        disconnect();
        this.stopping.set(true);
        Future<Tuple<ProcessSession, List<FlowFile>>> andSet = this.consumerFutureRef.getAndSet(null);
        if (andSet != null) {
            try {
                Tuple<ProcessSession, List<FlowFile>> tuple = andSet.get();
                if (((List) tuple.getValue()).size() > 0) {
                    getLogger().debug("Draining remaining flow Files when stopping");
                    this.flowFilesPerSessionQueue.add(tuple);
                } else {
                    ((ProcessSession) tuple.getKey()).commit();
                }
            } catch (InterruptedException | ExecutionException e) {
                getLogger().error("Failure in cleaning up!", e);
            }
            boolean z = true;
            while (z) {
                try {
                    z = transferFlowFiles();
                } catch (Throwable th) {
                    getLogger().error("Problem transferring cached flowfiles", th);
                }
            }
        }
    }

    @OnStopped
    public void stopped() {
        this.sessionFactoryRef.set(null);
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(RELATIONSHIP_SUCCESS);
        relationships = Collections.unmodifiableSet(hashSet);
        PORT = new PropertyDescriptor.Builder().name("Port").description("Port to listen on. Must be known by senders of Datagrams.").addValidator(StandardValidators.PORT_VALIDATOR).required(true).build();
        RECV_TIMEOUT = new PropertyDescriptor.Builder().name("Receive Timeout").description("The time out period when waiting to receive data from the socket. Specify units.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 secs").required(true).build();
        FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder().name("FlowFile Per Datagram").description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
        MAX_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Buffer Size").description("Determines the size each receive buffer may be").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder().name("FlowFile Size Trigger").description("Determines the (almost) upper bound size at which a flow file would be generated.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        MAX_UDP_BUFFER = new PropertyDescriptor.Builder().name("Max size of UDP Buffer").description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System to indicate how big the udp socket buffer should be.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        RECV_BUFFER_COUNT = new PropertyDescriptor.Builder().name("Receive Buffer Count").description("Number of receiving buffers to be used to accept data from the socket. Higher numbers means more ram is allocated but can allow better throughput.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("4").required(true).build();
        CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder().name("Channel Reader Interval").description("Scheduling interval for each read channel.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("50 ms").required(true).build();
        FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder().name("FlowFiles Per Session").description("The number of flow files per session.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10").build();
        SENDING_HOST = new PropertyDescriptor.Builder().name("Sending Host").description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(new HostValidator()).expressionLanguageSupported(true).build();
        SENDING_HOST_PORT = new PropertyDescriptor.Builder().name("Sending Host Port").description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(true).build();
        interfaceSet = new HashSet();
        try {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                interfaceSet.add(networkInterfaces.nextElement().getName());
            }
        } catch (SocketException e) {
        }
        NETWORK_INTF_NAME = new PropertyDescriptor.Builder().name("Local Network Interface").description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN.May be a system property or an environment variable.").addValidator(new Validator() { // from class: org.apache.nifi.processors.standard.ListenUDP.1
            public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
                String str3;
                String evaluate;
                ValidationResult build = new ValidationResult.Builder().subject("Local Network Interface").valid(true).input(str2).build();
                if (ListenUDP.interfaceSet.contains(str2.toLowerCase())) {
                    return build;
                }
                try {
                    evaluate = validationContext.newExpressionLanguageCompiler().compile(str2).evaluate();
                } catch (IllegalArgumentException e2) {
                    str3 = "Not a valid AttributeExpression: " + e2.getMessage();
                }
                if (ListenUDP.interfaceSet.contains(evaluate.toLowerCase())) {
                    return build;
                }
                str3 = evaluate + " is not a valid network name. Valid names are " + ListenUDP.interfaceSet.toString();
                return new ValidationResult.Builder().subject("Local Network Interface").valid(false).input(str2).explanation(str3).build();
            }
        }).expressionLanguageSupported(true).build();
        ArrayList arrayList = new ArrayList();
        arrayList.add(SENDING_HOST);
        arrayList.add(SENDING_HOST_PORT);
        arrayList.add(NETWORK_INTF_NAME);
        arrayList.add(CHANNEL_READER_PERIOD);
        arrayList.add(FLOW_FILE_SIZE_TRIGGER);
        arrayList.add(MAX_BUFFER_SIZE);
        arrayList.add(MAX_UDP_BUFFER);
        arrayList.add(PORT);
        arrayList.add(RECV_BUFFER_COUNT);
        arrayList.add(FLOW_FILES_PER_SESSION);
        arrayList.add(RECV_TIMEOUT);
        arrayList.add(FLOW_FILE_PER_DATAGRAM);
        properties = Collections.unmodifiableList(arrayList);
    }
}
