/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.servlet.Servlet;
import javax.ws.rs.Path;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
import org.apache.nifi.stream.io.StreamThrottler;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HandlerContainer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"ingest", "http", "https", "rest", "listen"})
@CapabilityDescription(value="Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
public class ListenHTTP
extends AbstractSessionFactoryProcessor {
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> properties;
    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success").description("Relationship for successfully received FlowFiles").build();
    public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder().name("Base Path").description("Base path for incoming connections").required(true).defaultValue("contentListener").addValidator(StandardValidators.URI_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Listening Port").description("The Port to listen on for incoming connections").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder().name("Authorized DN Pattern").description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.").required(true).defaultValue(".*").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder().name("Max Unconfirmed Flowfile Time").description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache").required(true).defaultValue("60 secs").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder().name("Max Data to Receive per Second").description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder().name("HTTP Headers to receive as Attributes (Regex)").description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).required(false).build();
    public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
    public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
    public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
    public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
    public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
    public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
    public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
    public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
    public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
    private volatile Server server = null;
    private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>();
    private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference();
    private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference();

    protected void init(ProcessorInitializationContext context) {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(RELATIONSHIP_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(BASE_PATH);
        descriptors.add(PORT);
        descriptors.add(MAX_DATA_RATE);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(AUTHORIZED_DN_PATTERN);
        descriptors.add(MAX_UNCONFIRMED_TIME);
        descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
        this.properties = Collections.unmodifiableList(descriptors);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnStopped
    public void shutdownHttpServer() {
        Server toShutdown;
        StreamThrottler throttler = this.throttlerRef.getAndSet(null);
        if (throttler != null) {
            try {
                throttler.close();
            }
            catch (IOException e) {
                this.getLogger().error("Failed to close StreamThrottler", (Throwable)e);
            }
        }
        if ((toShutdown = this.server) == null) {
            return;
        }
        try {
            toShutdown.stop();
            toShutdown.destroy();
        }
        catch (Exception ex) {
            this.getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
            this.server = null;
        }
    }

    private void createHttpServerFromService(ProcessContext context) throws Exception {
        ServerConnector connector;
        String keystorePath;
        String basePath = context.getProperty(BASE_PATH).getValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
        LeakyBucketStreamThrottler streamThrottler = maxBytesPerSecond == null ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
        this.throttlerRef.set((StreamThrottler)streamThrottler);
        boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null;
        SslContextFactory contextFactory = new SslContextFactory();
        contextFactory.setNeedClientAuth(needClientAuth);
        if (needClientAuth) {
            contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile());
            contextFactory.setTrustStoreType(sslContextService.getTrustStoreType());
            contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword());
        }
        String string = keystorePath = sslContextService == null ? null : sslContextService.getKeyStoreFile();
        if (keystorePath != null) {
            String keystorePassword = sslContextService.getKeyStorePassword();
            String keyStoreType = sslContextService.getKeyStoreType();
            contextFactory.setKeyStorePath(keystorePath);
            contextFactory.setKeyManagerPassword(keystorePassword);
            contextFactory.setKeyStorePassword(keystorePassword);
            contextFactory.setKeyStoreType(keyStoreType);
        }
        QueuedThreadPool threadPool = new QueuedThreadPool();
        threadPool.setName(String.format("%s (%s) Web Server", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        Server server = new Server((ThreadPool)threadPool);
        int port = context.getProperty(PORT).asInteger();
        HttpConfiguration httpConfiguration = new HttpConfiguration();
        if (keystorePath == null) {
            connector = new ServerConnector(server, new ConnectionFactory[]{new HttpConnectionFactory(httpConfiguration)});
        } else {
            httpConfiguration.setSecureScheme("https");
            httpConfiguration.setSecurePort(port);
            httpConfiguration.addCustomizer((HttpConfiguration.Customizer)new SecureRequestCustomizer());
            connector = new ServerConnector(server, new ConnectionFactory[]{new SslConnectionFactory(contextFactory, "http/1.1"), new HttpConnectionFactory(httpConfiguration)});
        }
        connector.setPort(port);
        server.setConnectors(new Connector[]{connector});
        ServletContextHandler contextHandler = new ServletContextHandler((HandlerContainer)server, "/", true, keystorePath != null);
        for (Class<? extends Servlet> cls : this.getServerClasses()) {
            Path path = cls.getAnnotation(Path.class);
            if (basePath.isEmpty() && !path.value().isEmpty()) {
                contextHandler.addServlet(cls, path.value());
                continue;
            }
            contextHandler.addServlet(cls, "/" + basePath + path.value());
        }
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, (Object)this);
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, (Object)this.getLogger());
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, this.sessionFactoryReference);
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, (Object)context);
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, this.flowFileMap);
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, (Object)Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, (Object)streamThrottler);
        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, (Object)basePath);
        if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
            contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, (Object)Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
        }
        server.start();
        this.server = server;
    }

    @OnScheduled
    public void createHttpServer(ProcessContext context) throws Exception {
        this.createHttpServerFromService(context);
    }

    protected Set<Class<? extends Servlet>> getServerClasses() {
        HashSet<Class<? extends Servlet>> s = new HashSet<Class<? extends Servlet>>();
        s.add(ListenHTTPServlet.class);
        s.add(ContentAcknowledgmentServlet.class);
        return s;
    }

    private Set<String> findOldFlowFileIds(ProcessContext ctx) {
        HashSet<String> old = new HashSet<String>();
        long expiryMillis = ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
        long cutoffTime = System.currentTimeMillis() - expiryMillis;
        for (Map.Entry entry : this.flowFileMap.entrySet()) {
            FlowFileEntryTimeWrapper wrapper = (FlowFileEntryTimeWrapper)entry.getValue();
            if (wrapper == null || wrapper.getEntryTime() >= cutoffTime) continue;
            old.add((String)entry.getKey());
        }
        return old;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) {
        this.sessionFactoryReference.compareAndSet(null, sessionFactory);
        for (String id : this.findOldFlowFileIds(context)) {
            FlowFileEntryTimeWrapper wrapper = (FlowFileEntryTimeWrapper)this.flowFileMap.remove(id);
            if (wrapper == null) continue;
            this.getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
            wrapper.session.rollback();
        }
        context.yield();
    }

    public static class FlowFileEntryTimeWrapper {
        private final Set<FlowFile> flowFiles;
        private final long entryTime;
        private final ProcessSession session;

        public FlowFileEntryTimeWrapper(ProcessSession session, Set<FlowFile> flowFiles, long entryTime) {
            this.flowFiles = flowFiles;
            this.entryTime = entryTime;
            this.session = session;
        }

        public Set<FlowFile> getFlowFiles() {
            return this.flowFiles;
        }

        public long getEntryTime() {
            return this.entryTime;
        }

        public ProcessSession getSession() {
            return this.session;
        }
    }
}

