package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
import org.apache.nifi.processors.standard.relp.handler.RELPMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Listens for RELP messages being sent to a given port over TCP. Each message will be acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data portion of one or more RELP frames. In the case where the RELP frames contain syslog messages, the output of this processor can be sent to a ParseSyslog processor for further processing.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@WritesAttributes({@WritesAttribute(attribute = RELPMetadata.COMMAND_KEY, description = "The command of the RELP frames."), @WritesAttribute(attribute = "relp.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "relp.port", description = "The sending port the messages were received over."), @WritesAttribute(attribute = RELPMetadata.TXNR_KEY, description = "The transaction number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")})
@SeeAlso({ParseSyslog.class})
/* loaded from: input_file:org/apache/nifi/processors/standard/ListenRELP.class */
public class ListenRELP extends AbstractProcessor {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").displayName("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues(ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    protected List<PropertyDescriptor> descriptors;
    protected Set<Relationship> relationships;
    protected volatile int port;
    protected volatile BlockingQueue<RELPMessage> events;
    protected volatile BlockingQueue<RELPMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher eventBatcher;

    /* loaded from: input_file:org/apache/nifi/processors/standard/ListenRELP$RELPAttributes.class */
    public enum RELPAttributes implements FlowFileAttributeKey {
        TXNR(RELPMetadata.TXNR_KEY),
        COMMAND(RELPMetadata.COMMAND_KEY),
        SENDER("relp.sender"),
        PORT("relp.port");

        private final String key;

        RELPAttributes(String str) {
            this.key = str;
        }

        public String key() {
            return this.key;
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        int intValue = processContext.getProperty(ListenerProperties.WORKER_THREADS).asInteger().intValue();
        int intValue2 = processContext.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(processContext.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue());
        Charset forName = Charset.forName(processContext.getProperty(ListenerProperties.CHARSET).getValue());
        this.port = processContext.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger().intValue();
        this.events = new LinkedBlockingQueue(processContext.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger().intValue());
        this.errorEvents = new LinkedBlockingQueue();
        this.eventBatcher = getEventBatcher();
        this.messageDemarcatorBytes = getMessageDemarcator(processContext).getBytes(forName);
        NettyEventServerFactory nettyEventServerFactory = getNettyEventServerFactory(interfaceAddress, this.port, forName, this.events);
        nettyEventServerFactory.setSocketReceiveBuffer(Integer.valueOf(intValue2));
        nettyEventServerFactory.setWorkerThreads(intValue);
        configureFactoryForSsl(processContext, nettyEventServerFactory);
        try {
            this.eventServer = nettyEventServerFactory.getEventServer();
        } catch (EventException e) {
            getLogger().error("Failed to bind to [{}:{}].", new Object[]{interfaceAddress.getHostAddress(), Integer.valueOf(this.port)});
        }
    }

    public int getListeningPort() {
        if (this.eventServer == null) {
            return 0;
        }
        return this.eventServer.getListeningPort();
    }

    @OnStopped
    public void stopped() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
            this.eventServer = null;
        }
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ListenerProperties.NETWORK_INTF_NAME);
        arrayList.add(ListenerProperties.PORT);
        arrayList.add(ListenerProperties.RECV_BUFFER_SIZE);
        arrayList.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
        arrayList.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
        arrayList.add(ListenerProperties.CHARSET);
        arrayList.add(ListenerProperties.WORKER_THREADS);
        arrayList.add(ListenerProperties.MAX_BATCH_SIZE);
        arrayList.add(ListenerProperties.MESSAGE_DELIMITER);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(CLIENT_AUTH);
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        SSLContextService asControllerService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        String value = validationContext.getProperty(CLIENT_AUTH).getValue();
        if (asControllerService != null && StringUtils.isBlank(value)) {
            arrayList.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return arrayList;
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    private void configureFactoryForSsl(ProcessContext processContext, NettyEventServerFactory nettyEventServerFactory) {
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService == null) {
            nettyEventServerFactory.setSslContext((SSLContext) null);
            return;
        }
        String value = processContext.getProperty(CLIENT_AUTH).getValue();
        SSLContext createContext = asControllerService.createContext();
        if (createContext != null) {
            nettyEventServerFactory.setSslContext(createContext);
            nettyEventServerFactory.setClientAuth(ClientAuth.valueOf(value));
        }
    }

    protected Map<String, String> getAttributes(FlowFileEventBatch flowFileEventBatch) {
        List events = flowFileEventBatch.getEvents();
        String sender = ((RELPMessage) events.get(0)).getSender();
        String command = ((RELPMessage) events.get(0)).getCommand();
        HashMap hashMap = new HashMap(events.size() == 1 ? 5 : 4);
        hashMap.put(RELPAttributes.COMMAND.key(), command);
        hashMap.put(RELPAttributes.SENDER.key(), sender);
        hashMap.put(RELPAttributes.PORT.key(), String.valueOf(this.port));
        hashMap.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
        if (events.size() == 1) {
            hashMap.put(RELPAttributes.TXNR.key(), String.valueOf(((RELPMessage) events.get(0)).getTxnr()));
        }
        return hashMap;
    }

    protected String getTransitUri(FlowFileEventBatch flowFileEventBatch) {
        String sender = ((RELPMessage) flowFileEventBatch.getEvents().get(0)).getSender();
        return "relp://" + ((!sender.startsWith("/") || sender.length() <= 1) ? sender : sender.substring(1)) + ":" + this.port;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        processEvents(processSession, getEventBatcher().getBatches(processSession, processContext.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger().intValue(), this.messageDemarcatorBytes));
    }

    private void processEvents(ProcessSession processSession, Map<String, FlowFileEventBatch> map) {
        for (Map.Entry<String, FlowFileEventBatch> entry : map.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0 || events.size() == 0) {
                processSession.remove(flowFile);
                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
            } else {
                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, getAttributes(entry.getValue()));
                getLogger().debug("Transferring {} to success", new Object[]{putAllAttributes});
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                processSession.adjustCounter("FlowFiles Transferred to Success", 1L, false);
                processSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(entry.getValue()));
            }
        }
        processSession.commitAsync();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getRELPBatchKey(RELPMessage rELPMessage) {
        return rELPMessage.getSender() + "_" + rELPMessage.getCommand();
    }

    private EventBatcher getEventBatcher() {
        return new EventBatcher<RELPMessage>(getLogger(), this.events, this.errorEvents) { // from class: org.apache.nifi.processors.standard.ListenRELP.1
            /* JADX INFO: Access modifiers changed from: protected */
            public String getBatchKey(RELPMessage rELPMessage) {
                return ListenRELP.this.getRELPBatchKey(rELPMessage);
            }
        };
    }

    private NettyEventServerFactory getNettyEventServerFactory(InetAddress inetAddress, int i, Charset charset, BlockingQueue blockingQueue) {
        return new RELPMessageServerFactory(getLogger(), inetAddress, i, charset, blockingQueue);
    }

    private String getMessageDemarcator(ProcessContext processContext) {
        return processContext.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }
}
