/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.ParseSyslog;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.handler.RELPMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"listen", "relp", "tcp", "logs"})
@CapabilityDescription(value="Listens for RELP messages being sent to a given port over TCP. Each message will be acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data portion of one or more RELP frames. In the case where the RELP frames contain syslog messages, the output of this processor can be sent to a ParseSyslog processor for further processing.")
@WritesAttributes(value={@WritesAttribute(attribute="relp.command", description="The command of the RELP frames."), @WritesAttribute(attribute="relp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="relp.port", description="The sending port the messages were received over."), @WritesAttribute(attribute="relp.txnr", description="The transaction number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain")})
@SeeAlso(value={ParseSyslog.class})
public class ListenRELP
extends AbstractProcessor {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").displayName("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    protected List<PropertyDescriptor> descriptors;
    protected Set<Relationship> relationships;
    protected volatile int port;
    protected volatile BlockingQueue<RELPMessage> events;
    protected volatile BlockingQueue<RELPMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher eventBatcher;

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
        InetAddress hostname = NetworkUtils.getInterfaceAddress((String)networkInterface);
        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
        this.port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
        this.events = new LinkedBlockingQueue<RELPMessage>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
        this.errorEvents = new LinkedBlockingQueue<RELPMessage>();
        this.eventBatcher = this.getEventBatcher();
        String msgDemarcator = this.getMessageDemarcator(context);
        this.messageDemarcatorBytes = msgDemarcator.getBytes(charset);
        NettyEventServerFactory eventFactory = this.getNettyEventServerFactory(hostname, this.port, charset, this.events);
        eventFactory.setSocketReceiveBuffer(Integer.valueOf(bufferSize));
        eventFactory.setWorkerThreads(workerThreads);
        this.configureFactoryForSsl(context, eventFactory);
        try {
            this.eventServer = eventFactory.getEventServer();
        }
        catch (EventException e) {
            this.getLogger().error("Failed to bind to [{}:{}].", new Object[]{hostname.getHostAddress(), this.port});
        }
    }

    public int getListeningPort() {
        return this.eventServer == null ? 0 : this.eventServer.getListeningPort();
    }

    @OnStopped
    public void stopped() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
            this.eventServer = null;
        }
    }

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
        descriptors.add(ListenerProperties.PORT);
        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
        descriptors.add(ListenerProperties.CHARSET);
        descriptors.add(ListenerProperties.WORKER_THREADS);
        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(CLIENT_AUTH);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        SSLContextService sslContextService = (SSLContextService)validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
        if (sslContextService != null && StringUtils.isBlank((CharSequence)clientAuth)) {
            results.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return results;
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    private void configureFactoryForSsl(ProcessContext context, NettyEventServerFactory eventFactory) {
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null) {
            String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
            SSLContext sslContext = sslContextService.createContext();
            if (sslContext != null) {
                eventFactory.setSslContext(sslContext);
                eventFactory.setClientAuth(ClientAuth.valueOf((String)clientAuthValue));
            }
        } else {
            eventFactory.setSslContext(null);
        }
    }

    protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
        List events = batch.getEvents();
        String sender = ((RELPMessage)((Object)events.get(0))).getSender();
        String command = ((RELPMessage)((Object)events.get(0))).getCommand();
        int numAttributes = events.size() == 1 ? 5 : 4;
        HashMap<String, String> attributes = new HashMap<String, String>(numAttributes);
        attributes.put(RELPAttributes.COMMAND.key(), command);
        attributes.put(RELPAttributes.SENDER.key(), sender);
        attributes.put(RELPAttributes.PORT.key(), String.valueOf(this.port));
        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
        if (events.size() == 1) {
            attributes.put(RELPAttributes.TXNR.key(), String.valueOf(((RELPMessage)((Object)events.get(0))).getTxnr()));
        }
        return attributes;
    }

    protected String getTransitUri(FlowFileEventBatch batch) {
        List events = batch.getEvents();
        String sender = ((RELPMessage)((Object)events.get(0))).getSender();
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        String transitUri = "relp" + "://" + senderHost + ":" + this.port;
        return transitUri;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        EventBatcher eventBatcher = this.getEventBatcher();
        int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
        Map batches = eventBatcher.getBatches(session, batchSize, this.messageDemarcatorBytes);
        this.processEvents(session, batches);
    }

    private void processEvents(ProcessSession session, Map<String, FlowFileEventBatch> batches) {
        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0L || events.size() == 0) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
                continue;
            }
            Map<String, String> attributes = this.getAttributes(entry.getValue());
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
            String transitUri = this.getTransitUri(entry.getValue());
            session.getProvenanceReporter().receive(flowFile, transitUri);
        }
        session.commitAsync();
    }

    private String getRELPBatchKey(RELPMessage event) {
        return event.getSender() + "_" + event.getCommand();
    }

    private EventBatcher getEventBatcher() {
        return new EventBatcher<RELPMessage>(this.getLogger(), this.events, this.errorEvents){

            protected String getBatchKey(RELPMessage event) {
                return ListenRELP.this.getRELPBatchKey(event);
            }
        };
    }

    private NettyEventServerFactory getNettyEventServerFactory(InetAddress hostname, int port, Charset charset, BlockingQueue events) {
        return new RELPMessageServerFactory(this.getLogger(), hostname, port, charset, events);
    }

    private String getMessageDemarcator(ProcessContext context) {
        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    public static enum RELPAttributes implements FlowFileAttributeKey
    {
        TXNR("relp.txnr"),
        COMMAND("relp.command"),
        SENDER("relp.sender"),
        PORT("relp.port");

        private final String key;

        private RELPAttributes(String key) {
            this.key = key;
        }

        public String key() {
            return this.key;
        }
    }
}

