/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.io.nio.ChannelListener;
import org.apache.nifi.io.nio.consumer.StreamConsumer;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
import org.apache.nifi.util.Tuple;

@TriggerWhenEmpty
@Tags(value={"ingest", "udp", "listen", "source"})
@CapabilityDescription(value="Listens for Datagram Packets on a given port and concatenates the contents of those packets together generating flow files")
public class ListenUDP
extends AbstractSessionFactoryProcessor {
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> properties;
    public static final Relationship RELATIONSHIP_SUCCESS;
    public static final PropertyDescriptor PORT;
    public static final PropertyDescriptor RECV_TIMEOUT;
    public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM;
    public static final PropertyDescriptor MAX_BUFFER_SIZE;
    public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER;
    public static final PropertyDescriptor MAX_UDP_BUFFER;
    public static final PropertyDescriptor RECV_BUFFER_COUNT;
    public static final PropertyDescriptor CHANNEL_READER_PERIOD;
    public static final PropertyDescriptor FLOW_FILES_PER_SESSION;
    public static final PropertyDescriptor SENDING_HOST;
    public static final PropertyDescriptor SENDING_HOST_PORT;
    private static final Set<String> interfaceSet;
    public static final PropertyDescriptor NETWORK_INTF_NAME;
    public static final int DEFAULT_LISTENING_THREADS = 2;
    private final Lock lock = new ReentrantLock();
    private volatile ChannelListener channelListener = null;
    private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue<Tuple<ProcessSession, List<FlowFile>>>();
    private final List<FlowFile> newFlowFiles = new ArrayList<FlowFile>();
    private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference();
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference();
    private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor();
    private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference();
    private final AtomicBoolean resetChannelListener = new AtomicBoolean(false);
    private volatile String sendingHost;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    @OnScheduled
    public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
        this.getChannelListener(context);
        this.stopping.set(false);
        Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = this.consumerExecutorService.submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Loose catch block
             */
            @Override
            public Tuple<ProcessSession, List<FlowFile>> call() {
                Tuple flowFilesPerSession;
                UDPStreamConsumer consumer;
                int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
                long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
                int maxWaits = (int)(channelReaderIntervalMSecs <= 1000L ? 5000L / channelReaderIntervalMSecs : 1L);
                ProcessorLog logger = ListenUDP.this.getLogger();
                int flowFileCount = maxFlowFilesPerSession;
                ProcessSession session = null;
                int numWaits = 0;
                while (!ListenUDP.this.stopping.get()) {
                    consumer = (UDPStreamConsumer)ListenUDP.this.consumerRef.get();
                    if (consumer == null || ListenUDP.this.sessionFactoryRef.get() == null) {
                        try {
                            Thread.sleep(100L);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    try {
                        if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
                            logger.debug("Have waited {} times", new Object[]{numWaits});
                            numWaits = 0;
                            if (session != null) {
                                flowFilesPerSession = new Tuple((Object)session, new ArrayList(ListenUDP.this.newFlowFiles));
                                ListenUDP.this.newFlowFiles.clear();
                                ListenUDP.this.flowFilesPerSessionQueue.add(flowFilesPerSession);
                            }
                            session = ((ProcessSessionFactory)ListenUDP.this.sessionFactoryRef.get()).createSession();
                            consumer.setSession(session);
                            flowFileCount = 0;
                        }
                        if (context.getAvailableRelationships().size() > 0) {
                            consumer.process();
                            if (flowFileCount == ListenUDP.this.newFlowFiles.size()) {
                                Thread.sleep(channelReaderIntervalMSecs);
                                if (flowFileCount > 0) {
                                    ++numWaits;
                                }
                            } else {
                                flowFileCount = ListenUDP.this.newFlowFiles.size();
                            }
                        } else {
                            logger.debug("Creating back pressure...no available destinations");
                            Thread.sleep(1000L);
                        }
                        if (!consumer.isConsumerFinished()) continue;
                    }
                    catch (IOException ioe222222) {
                        logger.error("Unable to fully process consumer {}", new Object[]{consumer}, (Throwable)ioe222222);
                        if (!consumer.isConsumerFinished()) continue;
                        logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
                        ListenUDP.this.consumerRef.set(null);
                        ListenUDP.this.disconnect();
                        if (ListenUDP.this.stopping.get()) continue;
                        ListenUDP.this.resetChannelListener.set(true);
                        continue;
                    }
                    catch (InterruptedException ioe222222) {
                        if (!consumer.isConsumerFinished()) continue;
                        logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
                        ListenUDP.this.consumerRef.set(null);
                        ListenUDP.this.disconnect();
                        if (ListenUDP.this.stopping.get()) continue;
                        ListenUDP.this.resetChannelListener.set(true);
                        continue;
                        {
                            catch (Throwable throwable) {
                                if (consumer.isConsumerFinished()) {
                                    logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
                                    ListenUDP.this.consumerRef.set(null);
                                    ListenUDP.this.disconnect();
                                    if (!ListenUDP.this.stopping.get()) {
                                        ListenUDP.this.resetChannelListener.set(true);
                                    }
                                }
                                throw throwable;
                            }
                        }
                    }
                    logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
                    ListenUDP.this.consumerRef.set(null);
                    ListenUDP.this.disconnect();
                    if (ListenUDP.this.stopping.get()) continue;
                    ListenUDP.this.resetChannelListener.set(true);
                }
                while ((consumer = (UDPStreamConsumer)ListenUDP.this.consumerRef.get()) != null && !consumer.isConsumerFinished()) {
                    try {
                        consumer.process();
                    }
                    catch (IOException ioe222222) {}
                }
                flowFilesPerSession = new Tuple(session, new ArrayList(ListenUDP.this.newFlowFiles));
                return flowFilesPerSession;
            }
        });
        this.consumerFutureRef.set(consumerFuture);
    }

    private void disconnect() {
        if (this.lock.tryLock()) {
            try {
                if (this.channelListener != null) {
                    this.getLogger().debug("Shutting down channel listener {}", new Object[]{this.channelListener});
                    this.channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
                    this.channelListener = null;
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void getChannelListener(ProcessContext context) throws IOException {
        if (this.lock.tryLock()) {
            try {
                ProcessorLog logger = this.getLogger();
                logger.debug("Instantiating a new channel listener");
                int port = context.getProperty(PORT).asInteger();
                int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger();
                Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
                Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
                this.sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
                Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
                String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
                final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
                int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
                final boolean flowFilePerDatagram = context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean();
                StreamConsumerFactory consumerFactory = new StreamConsumerFactory(){

                    public StreamConsumer newInstance(String streamId) {
                        UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, ListenUDP.this.newFlowFiles, flowFileSizeTrigger.intValue(), ListenUDP.this.getLogger(), flowFilePerDatagram);
                        ListenUDP.this.consumerRef.set(consumer);
                        return consumer;
                    }
                };
                int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
                BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, 2.147483647E9);
                this.channelListener = new ChannelListener(2, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram);
                this.channelListener.setChannelReaderSchedulingPeriod((long)readerMilliseconds, TimeUnit.MILLISECONDS);
                InetAddress nicIPAddress = null;
                if (null != nicIPAddressStr) {
                    NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
                    nicIPAddress = netIF.getInetAddresses().nextElement();
                }
                this.channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), this.sendingHost, sendingHostPort);
                logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "...");
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> result = new ArrayList<ValidationResult>();
        String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
        String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
        if (StringUtils.isBlank((CharSequence)sendingHost) && StringUtils.isNotBlank((CharSequence)sendingPort)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false).explanation("Must specify Sending Host when specifying Sending Host Port").build());
        } else if (StringUtils.isBlank((CharSequence)sendingPort) && StringUtils.isNotBlank((CharSequence)sendingHost)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
        }
        return result;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessorLog logger = this.getLogger();
        this.sessionFactoryRef.compareAndSet(null, sessionFactory);
        if (this.resetChannelListener.getAndSet(false) && !this.stopping.get()) {
            try {
                this.getChannelListener(context);
            }
            catch (IOException e) {
                logger.error("Tried to reset Channel Listener and failed due to:", (Throwable)e);
                this.resetChannelListener.set(true);
            }
        }
        this.transferFlowFiles();
    }

    private boolean transferFlowFiles() {
        ProcessorLog logger = this.getLogger();
        Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null;
        boolean transferred = false;
        try {
            flowFilesPerSession = this.flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (flowFilesPerSession != null) {
            ProcessSession session = (ProcessSession)flowFilesPerSession.getKey();
            List flowFiles = (List)flowFilesPerSession.getValue();
            String sourceSystem = this.sendingHost == null ? "Unknown" : this.sendingHost;
            try {
                for (FlowFile flowFile : flowFiles) {
                    session.getProvenanceReporter().receive(flowFile, sourceSystem);
                    session.transfer(flowFile, RELATIONSHIP_SUCCESS);
                }
                logger.info("Transferred flow files {} to success", new Object[]{flowFiles});
                transferred = true;
                List existingFlowFiles = session.get(10);
                for (FlowFile existingFlowFile : existingFlowFiles) {
                    if (existingFlowFile != null && existingFlowFile.getSize() > 0L) {
                        session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS);
                        logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success", new Object[]{existingFlowFile});
                        continue;
                    }
                    if (existingFlowFile == null) continue;
                    session.remove(existingFlowFile);
                    logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{existingFlowFile});
                }
                session.commit();
            }
            catch (Throwable t) {
                session.rollback();
                logger.error("Failed to transfer flow files or commit session...rolled back", t);
                throw t;
            }
        }
        return transferred;
    }

    @OnUnscheduled
    public void stopping() {
        this.getLogger().debug("Stopping Processor");
        this.disconnect();
        this.stopping.set(true);
        Future future = this.consumerFutureRef.getAndSet(null);
        if (future != null) {
            try {
                Tuple flowFilesPerSession = (Tuple)future.get();
                if (((List)flowFilesPerSession.getValue()).size() > 0) {
                    this.getLogger().debug("Draining remaining flow Files when stopping");
                    this.flowFilesPerSessionQueue.add((Tuple<ProcessSession, List<FlowFile>>)flowFilesPerSession);
                } else {
                    ((ProcessSession)flowFilesPerSession.getKey()).commit();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                this.getLogger().error("Failure in cleaning up!", (Throwable)e);
            }
            boolean moreFiles = true;
            while (moreFiles) {
                try {
                    moreFiles = this.transferFlowFiles();
                }
                catch (Throwable t) {
                    this.getLogger().error("Problem transferring cached flowfiles", t);
                }
            }
        }
    }

    @OnStopped
    public void stopped() {
        this.sessionFactoryRef.set(null);
    }

    static {
        RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success").description("Connection which contains concatenated Datagram Packets").build();
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(RELATIONSHIP_SUCCESS);
        relationships = Collections.unmodifiableSet(rels);
        PORT = new PropertyDescriptor.Builder().name("Port").description("Port to listen on. Must be known by senders of Datagrams.").addValidator(StandardValidators.PORT_VALIDATOR).required(true).build();
        RECV_TIMEOUT = new PropertyDescriptor.Builder().name("Receive Timeout").description("The time out period when waiting to receive data from the socket. Specify units.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 secs").required(true).build();
        FLOW_FILE_PER_DATAGRAM = new PropertyDescriptor.Builder().name("FlowFile Per Datagram").description("Determines if this processor emits each datagram as a FlowFile, or if multiple datagrams can be placed in a single FlowFile.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
        MAX_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Buffer Size").description("Determines the size each receive buffer may be").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder().name("FlowFile Size Trigger").description("Determines the (almost) upper bound size at which a flow file would be generated.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        MAX_UDP_BUFFER = new PropertyDescriptor.Builder().name("Max size of UDP Buffer").description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System to indicate how big the udp socket buffer should be.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
        RECV_BUFFER_COUNT = new PropertyDescriptor.Builder().name("Receive Buffer Count").description("Number of receiving buffers to be used to accept data from the socket. Higher numbers means more ram is allocated but can allow better throughput.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("4").required(true).build();
        CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder().name("Channel Reader Interval").description("Scheduling interval for each read channel.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("50 ms").required(true).build();
        FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder().name("FlowFiles Per Session").description("The number of flow files per session.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10").build();
        SENDING_HOST = new PropertyDescriptor.Builder().name("Sending Host").description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator((Validator)new HostValidator()).expressionLanguageSupported(true).build();
        SENDING_HOST_PORT = new PropertyDescriptor.Builder().name("Sending Host Port").description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(true).build();
        interfaceSet = new HashSet<String>();
        try {
            Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces();
            while (interfaceEnum.hasMoreElements()) {
                NetworkInterface ifc = interfaceEnum.nextElement();
                interfaceSet.add(ifc.getName());
            }
        }
        catch (SocketException interfaceEnum) {
            // empty catch block
        }
        NETWORK_INTF_NAME = new PropertyDescriptor.Builder().name("Local Network Interface").description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN.May be a system property or an environment variable.").addValidator(new Validator(){

            public ValidationResult validate(String subject, String input, ValidationContext context) {
                String message;
                ValidationResult result = new ValidationResult.Builder().subject("Local Network Interface").valid(true).input(input).build();
                if (interfaceSet.contains(input.toLowerCase())) {
                    return result;
                }
                try {
                    AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
                    String realValue = ae.evaluate();
                    if (interfaceSet.contains(realValue.toLowerCase())) {
                        return result;
                    }
                    message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
                }
                catch (IllegalArgumentException e) {
                    message = "Not a valid AttributeExpression: " + e.getMessage();
                }
                result = new ValidationResult.Builder().subject("Local Network Interface").valid(false).input(input).explanation(message).build();
                return result;
            }
        }).expressionLanguageSupported(true).build();
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(SENDING_HOST);
        props.add(SENDING_HOST_PORT);
        props.add(NETWORK_INTF_NAME);
        props.add(CHANNEL_READER_PERIOD);
        props.add(FLOW_FILE_SIZE_TRIGGER);
        props.add(MAX_BUFFER_SIZE);
        props.add(MAX_UDP_BUFFER);
        props.add(PORT);
        props.add(RECV_BUFFER_COUNT);
        props.add(FLOW_FILES_PER_SESSION);
        props.add(RECV_TIMEOUT);
        props.add(FLOW_FILE_PER_DATAGRAM);
        properties = Collections.unmodifiableList(props);
    }

    public static class HostValidator
    implements Validator {
        public ValidationResult validate(String subject, String input, ValidationContext context) {
            try {
                InetAddress.getByName(input);
                return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
            }
            catch (UnknownHostException e) {
                return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
            }
        }
    }
}

