package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.BufferAllocator;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then the Receive Buffer Size must be greater than 100kb.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SupportsBatching
@Tags({"listen", "tcp", "tls", "ssl"})
@WritesAttributes({@WritesAttribute(attribute = "tcp.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "tcp.port", description = "The sending port the messages were received.")})
/* loaded from: input_file:org/apache/nifi/processors/standard/ListenTCP.class */
public class ListenTCP extends AbstractProcessor {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues(ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder().name("max-receiving-threads").displayName("Max Number of Receiving Message Handler Threads").description("This property is deprecated and no longer used.").addValidator(StandardValidators.createLongValidator(1, 65535, true)).required(false).build();
    protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder().name("pool-receive-buffers").displayName("Pool Receive Buffers").description("Enable or disable pooling of buffers that the processor uses for handling bytes received on socket connections. The framework allocates buffers as needed during processing.").required(true).defaultValue("True").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    protected List<PropertyDescriptor> descriptors;
    protected Set<Relationship> relationships;
    protected volatile int port;
    protected volatile BlockingQueue<ByteArrayMessage> events;
    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher<ByteArrayMessage> eventBatcher;

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ListenerProperties.NETWORK_INTF_NAME);
        arrayList.add(ListenerProperties.PORT);
        arrayList.add(ListenerProperties.RECV_BUFFER_SIZE);
        arrayList.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
        arrayList.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
        arrayList.add(ListenerProperties.CHARSET);
        arrayList.add(ListenerProperties.WORKER_THREADS);
        arrayList.add(ListenerProperties.MAX_BATCH_SIZE);
        arrayList.add(ListenerProperties.MESSAGE_DELIMITER);
        arrayList.add(MAX_RECV_THREAD_POOL_SIZE);
        arrayList.add(POOL_RECV_BUFFERS);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(CLIENT_AUTH);
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        int intValue = processContext.getProperty(ListenerProperties.WORKER_THREADS).asInteger().intValue();
        int intValue2 = processContext.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        int intValue3 = processContext.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(processContext.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue());
        Charset forName = Charset.forName(processContext.getProperty(ListenerProperties.CHARSET).getValue());
        this.port = processContext.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger().intValue();
        this.events = new LinkedBlockingQueue(processContext.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger().intValue());
        this.errorEvents = new LinkedBlockingQueue();
        this.messageDemarcatorBytes = getMessageDemarcator(processContext).getBytes(forName);
        ByteArrayMessageNettyEventServerFactory byteArrayMessageNettyEventServerFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), interfaceAddress, this.port, TransportProtocol.TCP, this.messageDemarcatorBytes, intValue2, this.events);
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService != null) {
            ClientAuth valueOf = ClientAuth.valueOf(processContext.getProperty(CLIENT_AUTH).getValue());
            byteArrayMessageNettyEventServerFactory.setSslContext(asControllerService.createContext());
            byteArrayMessageNettyEventServerFactory.setClientAuth(valueOf);
        }
        byteArrayMessageNettyEventServerFactory.setBufferAllocator(processContext.getProperty(POOL_RECV_BUFFERS).asBoolean().booleanValue() ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED);
        byteArrayMessageNettyEventServerFactory.setSocketReceiveBuffer(Integer.valueOf(intValue3));
        byteArrayMessageNettyEventServerFactory.setWorkerThreads(intValue);
        byteArrayMessageNettyEventServerFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
        try {
            this.eventServer = byteArrayMessageNettyEventServerFactory.getEventServer();
        } catch (EventException e) {
            getLogger().error("Failed to bind to [{}:{}]", new Object[]{interfaceAddress, Integer.valueOf(this.port), e});
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        processEvents(processSession, getEventBatcher().getBatches(processSession, processContext.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger().intValue(), this.messageDemarcatorBytes));
    }

    private void processEvents(ProcessSession processSession, Map<String, FlowFileEventBatch<ByteArrayMessage>> map) {
        for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : map.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0 || events.size() == 0) {
                processSession.remove(flowFile);
                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
            } else {
                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, getAttributes(entry.getValue()));
                getLogger().debug("Transferring {} to success", new Object[]{putAllAttributes});
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                processSession.adjustCounter("FlowFiles Transferred to Success", 1L, false);
                processSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(entry.getValue()));
            }
        }
    }

    @OnStopped
    public void stopped() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
        this.eventBatcher = null;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        String value = validationContext.getProperty(CLIENT_AUTH).getValue();
        if (validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class) != null && StringUtils.isBlank(value)) {
            arrayList.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return arrayList;
    }

    protected Map<String, String> getAttributes(FlowFileEventBatch<ByteArrayMessage> flowFileEventBatch) {
        String sender = ((ByteArrayMessage) flowFileEventBatch.getEvents().get(0)).getSender();
        HashMap hashMap = new HashMap(3);
        hashMap.put("tcp.sender", sender);
        hashMap.put("tcp.port", String.valueOf(this.port));
        return hashMap;
    }

    protected String getTransitUri(FlowFileEventBatch<ByteArrayMessage> flowFileEventBatch) {
        String sender = ((ByteArrayMessage) flowFileEventBatch.getEvents().get(0)).getSender();
        return String.format("tcp://%s:%d", (!sender.startsWith("/") || sender.length() <= 1) ? sender : sender.substring(1), Integer.valueOf(this.port));
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    private String getMessageDemarcator(ProcessContext processContext) {
        return processContext.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    private EventBatcher<ByteArrayMessage> getEventBatcher() {
        if (this.eventBatcher == null) {
            this.eventBatcher = new EventBatcher<ByteArrayMessage>(getLogger(), this.events, this.errorEvents) { // from class: org.apache.nifi.processors.standard.ListenTCP.1
                protected String getBatchKey(ByteArrayMessage byteArrayMessage) {
                    return byteArrayMessage.getSender();
                }
            };
        }
        return this.eventBatcher;
    }
}
