/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.SslSessionStatus;
import org.apache.nifi.event.transport.configuration.BufferAllocator;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"listen", "tcp", "tls", "ssl"})
@CapabilityDescription(value="Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then the Receive Buffer Size must be greater than 100kb. The processor can be configured to use an SSL Context Service to only allow secure connections. When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's issuer and subject are added to the outgoing FlowFiles as attributes. The processor does not perform authorization based on Distinguished Name values, but since these values are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
@WritesAttributes(value={@WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received."), @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the Certificate Authority that issued the client's certificate is attached to the FlowFile."), @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the client certificate's owner (subject) is attached to the FlowFile.")})
public class ListenTCP
extends AbstractProcessor {
    private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
    private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder().name("max-receiving-threads").displayName("Max Number of Receiving Message Handler Threads").description("This property is deprecated and no longer used.").addValidator(StandardValidators.createLongValidator((long)1L, (long)65535L, (boolean)true)).required(false).build();
    protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder().name("pool-receive-buffers").displayName("Pool Receive Buffers").description("Enable or disable pooling of buffers that the processor uses for handling bytes received on socket connections. The framework allocates buffers as needed during processing.").required(true).defaultValue("True").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    protected List<PropertyDescriptor> descriptors;
    protected Set<Relationship> relationships;
    protected volatile int port;
    protected volatile BlockingQueue<ByteArrayMessage> events;
    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher<ByteArrayMessage> eventBatcher;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
        descriptors.add(ListenerProperties.PORT);
        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
        descriptors.add(ListenerProperties.CHARSET);
        descriptors.add(ListenerProperties.WORKER_THREADS);
        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
        descriptors.add(POOL_RECV_BUFFERS);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(CLIENT_AUTH);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        boolean poolReceiveBuffers;
        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
        InetAddress address = NetworkUtils.getInterfaceAddress((String)networkInterface);
        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
        this.port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
        this.events = new LinkedBlockingQueue<ByteArrayMessage>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
        this.errorEvents = new LinkedBlockingQueue<ByteArrayMessage>();
        String msgDemarcator = this.getMessageDemarcator(context);
        this.messageDemarcatorBytes = msgDemarcator.getBytes(charset);
        ByteArrayMessageNettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(this.getLogger(), address, this.port, TransportProtocol.TCP, this.messageDemarcatorBytes, bufferSize, this.events);
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null) {
            String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
            ClientAuth clientAuth = ClientAuth.valueOf((String)clientAuthValue);
            SSLContext sslContext = sslContextService.createContext();
            eventFactory.setSslContext(sslContext);
            eventFactory.setClientAuth(clientAuth);
        }
        BufferAllocator bufferAllocator = (poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean().booleanValue()) ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
        eventFactory.setBufferAllocator(bufferAllocator);
        eventFactory.setSocketReceiveBuffer(Integer.valueOf(socketBufferSize));
        eventFactory.setWorkerThreads(workerThreads);
        eventFactory.setThreadNamePrefix(String.format("%s[%s]", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        try {
            this.eventServer = eventFactory.getEventServer();
        }
        catch (EventException e) {
            this.getLogger().error("Failed to bind to [{}:{}]", new Object[]{address, this.port, e});
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
        Map batches = this.getEventBatcher().getBatches(session, batchSize, this.messageDemarcatorBytes);
        this.processEvents(session, batches);
    }

    private void processEvents(ProcessSession session, Map<String, FlowFileEventBatch<ByteArrayMessage>> batches) {
        for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0L || events.size() == 0) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
                continue;
            }
            Map<String, String> attributes = this.getAttributes(entry.getValue());
            this.addClientCertificateAttributes(attributes, (ByteArrayMessage)events.get(0));
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
            String transitUri = this.getTransitUri(entry.getValue());
            session.getProvenanceReporter().receive(flowFile, transitUri);
        }
    }

    @OnStopped
    public void stopped() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
        this.eventBatcher = null;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
        SSLContextService sslContextService = (SSLContextService)validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null && StringUtils.isBlank((CharSequence)clientAuth)) {
            results.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return results;
    }

    protected Map<String, String> getAttributes(FlowFileEventBatch<ByteArrayMessage> batch) {
        List events = batch.getEvents();
        String sender = ((ByteArrayMessage)events.get(0)).getSender();
        HashMap<String, String> attributes = new HashMap<String, String>(3);
        attributes.put("tcp.sender", sender);
        attributes.put("tcp.port", String.valueOf(this.port));
        return attributes;
    }

    protected String getTransitUri(FlowFileEventBatch<ByteArrayMessage> batch) {
        List events = batch.getEvents();
        String sender = ((ByteArrayMessage)events.get(0)).getSender();
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        return String.format("tcp://%s:%d", senderHost, this.port);
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    private String getMessageDemarcator(ProcessContext context) {
        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    private EventBatcher<ByteArrayMessage> getEventBatcher() {
        if (this.eventBatcher == null) {
            this.eventBatcher = new EventBatcher<ByteArrayMessage>(this.getLogger(), this.events, this.errorEvents){

                protected String getBatchKey(ByteArrayMessage event) {
                    return event.getSender();
                }
            };
        }
        return this.eventBatcher;
    }

    private void addClientCertificateAttributes(Map<String, String> attributes, ByteArrayMessage event) {
        SslSessionStatus sslSessionStatus = event.getSslSessionStatus();
        if (sslSessionStatus != null) {
            attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubject().getName());
            attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
        }
    }
}

