/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.standard.ParseSyslog;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory;
import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.ssl.SSLContextService;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"listen", "relp", "tcp", "logs"})
@CapabilityDescription(value="Listens for RELP messages being sent to a given port over TCP. Each message will be acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data portion of one or more RELP frames. In the case where the RELP frames contain syslog messages, the output of this processor can be sent to a ParseSyslog processor for further processing.")
@WritesAttributes(value={@WritesAttribute(attribute="relp.command", description="The command of the RELP frames."), @WritesAttribute(attribute="relp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="relp.port", description="The sending port the messages were received over."), @WritesAttribute(attribute="relp.txnr", description="The transaction number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain")})
@SeeAlso(value={ParseSyslog.class})
public class ListenRELP
extends AbstractListenEventProcessor<RELPEvent> {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(SSLContextService.class).build();
    private volatile RELPEncoder relpEncoder;
    private volatile byte[] messageDemarcatorBytes;

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE);
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        super.onScheduled(context);
        this.relpEncoder = new RELPEncoder(this.charset);
        String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        this.messageDemarcatorBytes = msgDemarcator.getBytes(this.charset);
    }

    protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) throws IOException {
        RELPEventFactory eventFactory = new RELPEventFactory();
        RELPSocketChannelHandlerFactory handlerFactory = new RELPSocketChannelHandlerFactory();
        int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
        int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
        LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<ByteBuffer>(maxConnections);
        for (int i = 0; i < maxConnections; ++i) {
            bufferPool.offer(ByteBuffer.allocate(bufferSize));
        }
        SSLContext sslContext = null;
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null) {
            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
        }
        return new SocketChannelDispatcher((EventFactory)eventFactory, handlerFactory, bufferPool, events, this.getLogger(), maxConnections, sslContext, charSet);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
        Map batches = this.getBatches(session, maxBatchSize, this.messageDemarcatorBytes);
        if (batches.size() == 0) {
            context.yield();
            return;
        }
        for (Map.Entry entry : batches.entrySet()) {
            FlowFile flowFile = ((AbstractListenEventProcessor.FlowFileEventBatch)entry.getValue()).getFlowFile();
            List events = ((AbstractListenEventProcessor.FlowFileEventBatch)entry.getValue()).getEvents();
            if (flowFile.getSize() == 0L || events.size() == 0) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
                continue;
            }
            String sender = ((RELPEvent)((Object)events.get(0))).getSender();
            String command = ((RELPEvent)((Object)events.get(0))).getCommand();
            int numAttributes = events.size() == 1 ? 5 : 4;
            HashMap<String, String> attributes = new HashMap<String, String>(numAttributes);
            attributes.put(RELPAttributes.COMMAND.key(), command);
            attributes.put(RELPAttributes.SENDER.key(), sender);
            attributes.put(RELPAttributes.PORT.key(), String.valueOf(this.port));
            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
            if (events.size() == 1) {
                attributes.put(RELPAttributes.TXNR.key(), String.valueOf(((RELPEvent)((Object)events.get(0))).getTxnr()));
            }
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
            String transitUri = "relp" + "://" + senderHost + ":" + this.port;
            session.getProvenanceReporter().receive(flowFile, transitUri);
            session.commit();
            for (RELPEvent event : events) {
                this.respond(event, RELPResponse.ok(event.getTxnr()));
            }
        }
    }

    protected String getBatchKey(RELPEvent event) {
        return event.getSender() + "_" + event.getCommand();
    }

    protected void respond(RELPEvent event, RELPResponse relpResponse) {
        RELPChannelResponse response = new RELPChannelResponse(this.relpEncoder, relpResponse);
        ChannelResponder responder = event.getResponder();
        responder.addResponse((ChannelResponse)response);
        try {
            responder.respond();
        }
        catch (IOException e) {
            this.getLogger().error("Error sending response for transaction {} due to {}", new Object[]{event.getTxnr(), e.getMessage()}, (Throwable)e);
        }
    }

    public static enum RELPAttributes implements FlowFileAttributeKey
    {
        TXNR("relp.txnr"),
        COMMAND("relp.command"),
        SENDER("relp.sender"),
        PORT("relp.port");

        private final String key;

        private RELPAttributes(String key) {
            this.key = key;
        }

        public String key() {
            return this.key;
        }
    }
}

