/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.rabbitmq.client.SaslConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.amqp.processors.AMQPResource;
import org.apache.nifi.amqp.processors.AMQPWorker;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;

abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProcessor {
    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder().name("Host Name").description("Network address of AMQP broker (e.g., localhost)").required(true).defaultValue("localhost").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("Numeric value identifying Port of AMQP broker (e.g., 5671)").required(true).defaultValue("5672").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder().name("Virtual Host").description("Virtual Host name which segregates AMQP system for enhanced security.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder().name("User Name").description("User Name used for authentication and authorization.").required(true).defaultValue("guest").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("Password").description("Password used for authentication and authorization.").required(true).defaultValue("guest").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder().name("AMQP Version").description("AMQP Version. Currently only supports AMQP v0.9.1.").required(true).allowableValues(new String[]{"0.9.1"}).defaultValue("0.9.1").build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("ssl-context-service").displayName("SSL Context Service").description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor USE_CERT_AUTHENTICATION = new PropertyDescriptor.Builder().name("cert-authentication").displayName("Use Certificate Authentication").description("Authenticate using the SSL certificate common name rather than user name/password.").required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("ssl-client-auth").displayName("Client Auth").description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context has been defined and enabled.").required(false).allowableValues((Enum[])SSLContextService.ClientAuth.values()).defaultValue("REQUIRED").build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<AMQPResource<T>>();

    AbstractAMQPProcessor() {
    }

    protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
        return propertyDescriptors;
    }

    public final void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        AMQPResource<T> resource = (AMQPResource<T>)this.resourceQueue.poll();
        if (resource == null) {
            resource = this.createResource(context);
        }
        try {
            this.processResource(resource.getConnection(), resource.getWorker(), context, session);
            this.resourceQueue.offer(resource);
        }
        catch (Exception e) {
            try {
                resource.close();
            }
            catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    @OnStopped
    public void close() {
        AMQPResource resource;
        while ((resource = (AMQPResource)this.resourceQueue.poll()) != null) {
            try {
                resource.close();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to close AMQP Connection", (Throwable)e);
            }
        }
    }

    protected abstract void processResource(Connection var1, T var2, ProcessContext var3, ProcessSession var4) throws ProcessException;

    protected abstract T createAMQPWorker(ProcessContext var1, Connection var2);

    private AMQPResource<T> createResource(ProcessContext context) {
        Connection connection = this.createConnection(context);
        T worker = this.createAMQPWorker(context, connection);
        return new AMQPResource<T>(connection, worker);
    }

    protected Connection createConnection(ProcessContext context) {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
        cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
        cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
        cf.setPassword(context.getProperty(PASSWORD).getValue());
        String vHost = context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
        if (vHost != null) {
            cf.setVirtualHost(vHost);
        }
        Boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean();
        SSLContextService sslService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (useCertAuthentication.booleanValue() && sslService == null) {
            throw new IllegalStateException("This processor is configured to use cert authentication, but the SSL Context Service hasn't been configured. You need to configure the SSL Context Service.");
        }
        String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
        if (sslService != null) {
            SSLContextService.ClientAuth clientAuth;
            if (StringUtils.isBlank((CharSequence)rawClientAuth)) {
                clientAuth = SSLContextService.ClientAuth.REQUIRED;
            } else {
                try {
                    clientAuth = SSLContextService.ClientAuth.valueOf((String)rawClientAuth);
                }
                catch (IllegalArgumentException iae) {
                    throw new IllegalStateException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", rawClientAuth, StringUtils.join((Object[])SslContextFactory.ClientAuth.values(), (String)", ")));
                }
            }
            SSLContext sslContext = sslService.createSSLContext(clientAuth);
            cf.useSslProtocol(sslContext);
            if (useCertAuthentication.booleanValue()) {
                cf.setSaslConfig((SaslConfig)DefaultSaslConfig.EXTERNAL);
            }
        }
        try {
            Connection connection = cf.newConnection();
            return connection;
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e);
        }
    }

    static {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(HOST);
        properties.add(PORT);
        properties.add(V_HOST);
        properties.add(USER);
        properties.add(PASSWORD);
        properties.add(AMQP_VERSION);
        properties.add(SSL_CONTEXT_SERVICE);
        properties.add(USE_CERT_AUTHENTICATION);
        properties.add(CLIENT_AUTH);
        propertyDescriptors = Collections.unmodifiableList(properties);
    }
}

