package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Listens for RELP messages being sent to a given port over TCP. Each message will be acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data portion of one or more RELP frames. In the case where the RELP frames contain syslog messages, the output of this processor can be sent to a ParseSyslog processor for further processing.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "relp", "tcp", "logs"})
@WritesAttributes({@WritesAttribute(attribute = RELPMetadata.COMMAND_KEY, description = "The command of the RELP frames."), @WritesAttribute(attribute = "relp.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "relp.port", description = "The sending port the messages were received over."), @WritesAttribute(attribute = RELPMetadata.TXNR_KEY, description = "The transaction number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")})
@SeeAlso({ParseSyslog.class})
/* loaded from: input_file:org/apache/nifi/processors/standard/ListenRELP.class */
public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(SSLContextService.class).build();
    private volatile RELPEncoder relpEncoder;
    private volatile byte[] messageDemarcatorBytes;

    /* loaded from: input_file:org/apache/nifi/processors/standard/ListenRELP$RELPAttributes.class */
    public enum RELPAttributes implements FlowFileAttributeKey {
        TXNR(RELPMetadata.TXNR_KEY),
        COMMAND(RELPMetadata.COMMAND_KEY),
        SENDER("relp.sender"),
        PORT("relp.port");

        private final String key;

        RELPAttributes(String str) {
            this.key = str;
        }

        public String key() {
            return this.key;
        }
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE);
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        super.onScheduled(processContext);
        this.relpEncoder = new RELPEncoder(this.charset);
        this.messageDemarcatorBytes = processContext.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t").getBytes(this.charset);
    }

    protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<RELPEvent> blockingQueue) throws IOException {
        RELPEventFactory rELPEventFactory = new RELPEventFactory();
        RELPSocketChannelHandlerFactory rELPSocketChannelHandlerFactory = new RELPSocketChannelHandlerFactory();
        int intValue = processContext.getProperty(MAX_CONNECTIONS).asInteger().intValue();
        int intValue2 = processContext.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).getValue());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(intValue);
        for (int i = 0; i < intValue; i++) {
            linkedBlockingQueue.offer(ByteBuffer.allocate(intValue2));
        }
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        return new SocketChannelDispatcher(rELPEventFactory, rELPSocketChannelHandlerFactory, linkedBlockingQueue, blockingQueue, getLogger(), intValue, asControllerService != null ? asControllerService.createSSLContext(SSLContextService.ClientAuth.REQUIRED) : null, forName);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        Map batches = getBatches(processSession, processContext.getProperty(MAX_BATCH_SIZE).asInteger().intValue(), this.messageDemarcatorBytes);
        if (batches.size() == 0) {
            processContext.yield();
            return;
        }
        for (Map.Entry entry : batches.entrySet()) {
            FlowFile flowFile = ((AbstractListenEventProcessor.FlowFileEventBatch) entry.getValue()).getFlowFile();
            List<RELPEvent> events = ((AbstractListenEventProcessor.FlowFileEventBatch) entry.getValue()).getEvents();
            if (flowFile.getSize() == 0 || events.size() == 0) {
                processSession.remove(flowFile);
                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
            } else {
                String sender = ((RELPEvent) events.get(0)).getSender();
                String command = ((RELPEvent) events.get(0)).getCommand();
                HashMap hashMap = new HashMap(events.size() == 1 ? 5 : 4);
                hashMap.put(RELPAttributes.COMMAND.key(), command);
                hashMap.put(RELPAttributes.SENDER.key(), sender);
                hashMap.put(RELPAttributes.PORT.key(), String.valueOf(this.port));
                hashMap.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
                if (events.size() == 1) {
                    hashMap.put(RELPAttributes.TXNR.key(), String.valueOf(((RELPEvent) events.get(0)).getTxnr()));
                }
                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, hashMap);
                getLogger().debug("Transferring {} to success", new Object[]{putAllAttributes});
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                processSession.getProvenanceReporter().receive(putAllAttributes, "relp://" + ((!sender.startsWith("/") || sender.length() <= 1) ? sender : sender.substring(1)) + ":" + this.port);
                processSession.commit();
                for (RELPEvent rELPEvent : events) {
                    respond(rELPEvent, RELPResponse.ok(rELPEvent.getTxnr()));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getBatchKey(RELPEvent rELPEvent) {
        return rELPEvent.getSender() + "_" + rELPEvent.getCommand();
    }

    protected void respond(RELPEvent rELPEvent, RELPResponse rELPResponse) {
        RELPChannelResponse rELPChannelResponse = new RELPChannelResponse(this.relpEncoder, rELPResponse);
        ChannelResponder responder = rELPEvent.getResponder();
        responder.addResponse(rELPChannelResponse);
        try {
            responder.respond();
        } catch (IOException e) {
            getLogger().error("Error sending response for transaction {} due to {}", new Object[]{Long.valueOf(rELPEvent.getTxnr()), e.getMessage()}, e);
        }
    }
}
