package org.wso2.extension.siddhi.io.rabbitmq.source;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQConstants;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQSinkUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "rabbitmq", namespace = "source", description = "The rabbitmq source receives the events from the rabbitmq broker via the AMQP protocol. ", parameters = {@Parameter(name = RabbitMQConstants.RABBITMQ_SERVER_URI, description = "The URI that is used to connect to an AMQP server. If no URI is specified,an error is logged in the CLI.e.g.,\n`amqp://guest:guest`,\n`amqp://guest:guest@localhost:5672`", type = {DataType.STRING}), @Parameter(name = RabbitMQConstants.RABBITMQ_HEARTBEAT, description = "The period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries.", type = {DataType.INT}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_HEARTBEAT), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_NAME, description = "The name of the exchange that decides what to do with a message it receives.If the `exchange.name` already exists in the RabbitMQ server, then the system uses that `exchange.name` instead of redeclaring.", type = {DataType.STRING}), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_TYPE, description = "The type of the exchange name. The exchange types available are `direct`, `fanout`, `topic` and `headers`. For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html). ", type = {DataType.STRING}, optional = true, defaultValue = "direct"), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_DURABLE, description = "If this is set to `true`, the exchange remains declared even if the broker restarts.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_AUTO_DELETE, description = "If this is set to `true`, the exchange is automatically deleted when it is not used anymore. ", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_ROUTINGKEY, description = "The key based on which the exchange determines how to route the message to queues. The routing key is like an address for the message. The routing.key must be initialized when the value for the `exchange.type` parameter is `direct` or `topic`.", type = {DataType.STRING}, optional = true, defaultValue = "empty"), @Parameter(name = "headers", description = "The headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding. ", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_QUEUENAME, description = "A queue is a buffer that stores messages. If the queue name already exists in the RabbitMQ server, then the system usees that queue name instead of redeclaring it. If no value is specified for this parameter, the system uses the unique queue name that is automatically generated by the RabbitMQ server. ", type = {DataType.STRING}, optional = true, defaultValue = "system generated queue name"), @Parameter(name = RabbitMQConstants.RABBITMQ_QUEUE_DURABLE, description = "If this parameter is set to `true`, the queue remains declared even if the broker restarts", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_QUEUE_EXCLUSIVE, description = "If this parameter is set to `true`, the queue is exclusive for the current connection. If it is set to `false`, it is also consumable by other connections. ", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_QUEUE_AUTO_DELETE, description = "If this parameter is set to `true`, the queue is automatically deleted when it is not used anymore.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_ENABLED, description = "This parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to `true`, the `tls.truststore.path` and `tls.truststore.password` parameters are initialized.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_LOCATION, description = "The file path to the location of the truststore of the client that receives the RabbitMQ events via the `AMQP` protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the `${carbon.home}/resources/security` directory.", type = {DataType.STRING}, optional = true, defaultValue = "${carbon.home}/resources/security/client-truststore.jks"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_PASSWORD, description = "The password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses `wso2carbon` as the default password.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.TRUSTSTORE_PASSWORD_VALUE), @Parameter(name = "tls.truststore.type", description = "The type of the truststore.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_TLS_TRUSTSTORE_TYPE), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_VERSION, description = "The version of the tls/ssl.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_TLS_VERSION)}, examples = {@Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream FooStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(type ='rabbitmq',\nuri = 'amqp://guest:guest@localhost:5672',\nexchange.name = 'direct',\nrouting.key= 'direct',\n@map(type='xml'))\nDefine stream BarStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "This query receives events from the `direct` exchange with the `direct`exchange type, and the `directTest` routing key.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/rabbitmq/source/RabbitMQSource.class */
public class RabbitMQSource extends Source {
    private static final Logger log = Logger.getLogger(RabbitMQSource.class);
    private SourceEventListener sourceEventListener;
    private int heartbeat;
    private String queueName;
    private boolean queueExclusive;
    private boolean queueDurable;
    private boolean queueAutodelete;
    private String listenerUri;
    private String tlsTruststoreLocation;
    private String tlsTruststorePassword;
    private String tlsTruststoreType;
    private String tlsVersion;
    private boolean tlsEnabled;
    private String exchangeName;
    private String exchangeType;
    private boolean exchangeDurable;
    private boolean exchangeAutoDelete;
    private String routingKey;
    private Connection connection = null;
    private Map<String, Object> map = null;
    private FileInputStream fileInputStream = null;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.listenerUri = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_SERVER_URI);
        this.heartbeat = Integer.parseInt(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_HEARTBEAT, RabbitMQConstants.DEFAULT_HEARTBEAT));
        this.tlsTruststoreLocation = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_LOCATION, RabbitMQSinkUtil.getTrustStorePath(configReader));
        this.tlsTruststorePassword = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_PASSWORD, RabbitMQSinkUtil.getTrustStorePassword(configReader));
        this.tlsTruststoreType = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_TYPE, RabbitMQConstants.DEFAULT_TLS_TRUSTSTORE_TYPE);
        this.tlsVersion = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_VERSION, RabbitMQConstants.DEFAULT_TLS_VERSION);
        this.tlsEnabled = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_ENABLED, "false"));
        this.queueName = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_QUEUENAME, "");
        if (!this.queueName.isEmpty()) {
            this.queueExclusive = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_QUEUE_EXCLUSIVE, "false"));
            this.queueDurable = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_QUEUE_DURABLE, "false"));
            this.queueAutodelete = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_QUEUE_AUTO_DELETE, "false"));
        }
        this.exchangeName = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_EXCHANGE_NAME);
        this.exchangeType = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_EXCHANGE_TYPE, "direct");
        this.exchangeDurable = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_EXCHANGE_DURABLE, "false"));
        this.exchangeAutoDelete = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_EXCHANGE_AUTO_DELETE, "false"));
        this.routingKey = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_ROUTINGKEY, "");
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue("headers", RabbitMQConstants.NULL);
        if (validateAndGetStaticValue != null) {
            try {
                this.map = RabbitMQSinkUtil.getHeaders(validateAndGetStaticValue);
            } catch (IOException e) {
                throw new SiddhiAppCreationException("Invalid header format. Please include as 'key1:value1','key2:value2',..");
            }
        }
        this.sourceEventListener = sourceEventListener;
        if (!RabbitMQConstants.EXCHANGE_TYPE_FANOUT.equals(this.exchangeType) && !"direct".equals(this.exchangeType) && !RabbitMQConstants.EXCHANGE_TYPE_TOPIC.equals(this.exchangeType) && !"headers".equals(this.exchangeType)) {
            throw new SiddhiAppCreationException("Check the exchange type in " + this.sourceEventListener + ". There is no exchange type named as " + this.exchangeType + " in RabbitMQ");
        }
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(URI.create(this.listenerUri));
            connectionFactory.setRequestedHeartbeat(this.heartbeat);
            if (this.tlsEnabled) {
                try {
                    if (this.tlsTruststoreLocation.isEmpty()) {
                        connectionFactory.useSslProtocol();
                    } else {
                        try {
                            char[] charArray = this.tlsTruststorePassword.toCharArray();
                            KeyStore keyStore = KeyStore.getInstance(this.tlsTruststoreType);
                            this.fileInputStream = new FileInputStream(this.tlsTruststoreLocation);
                            keyStore.load(this.fileInputStream, charArray);
                            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                            trustManagerFactory.init(keyStore);
                            SSLContext sSLContext = SSLContext.getInstance(this.tlsVersion);
                            sSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
                            connectionFactory.useSslProtocol(sSLContext);
                            if (this.fileInputStream != null) {
                                this.fileInputStream.close();
                            }
                        } catch (FileNotFoundException e) {
                            throw new SiddhiAppCreationException("The trustStore File path tls.truststore.path = " + this.tlsTruststoreLocation + " defined in " + this.sourceEventListener + " is incorrect. Specify TrustStore location correctly.", e);
                        } catch (IOException e2) {
                            throw new SiddhiAppCreationException("The trustStore type tls.truststore.password = " + this.tlsTruststorePassword + " defined in " + this.sourceEventListener + " is incorrect. Specify TrustStore password correctly.", e2);
                        } catch (KeyStoreException e3) {
                            throw new SiddhiAppCreationException("The trustStore type tls.truststore.type = " + this.tlsTruststoreType + " defined in " + this.sourceEventListener + " is incorrect. Specify TrustStore type correctly.", e3);
                        } catch (NoSuchAlgorithmException e4) {
                            throw new SiddhiAppCreationException("Algorithm tls.version = " + this.tlsVersion + " defined in " + this.sourceEventListener + "is not available in TrustManagerFactory class.", e4);
                        } catch (CertificateException e5) {
                            throw new SiddhiAppCreationException("TrustStore is not specified in " + this.sourceEventListener, e5);
                        }
                    }
                } catch (Throwable th) {
                    if (this.fileInputStream != null) {
                        this.fileInputStream.close();
                    }
                    throw th;
                }
            }
            this.connection = connectionFactory.newConnection();
            RabbitMQConsumer.consume(this.connection, this.exchangeName, this.exchangeType, this.exchangeDurable, this.exchangeAutoDelete, this.queueName, this.queueExclusive, this.queueDurable, this.queueAutodelete, this.routingKey, this.map, this.sourceEventListener);
        } catch (IOException e6) {
            throw new ConnectionUnavailableException("Failed to connect with the Rabbitmq server. Check the uri = " + this.listenerUri + " defined in " + this.sourceEventListener, e6);
        } catch (URISyntaxException e7) {
            throw new SiddhiAppCreationException("There is an invalid syntax in the uri = " + this.listenerUri + " defined in " + this.sourceEventListener, e7);
        } catch (KeyManagementException e8) {
            throw new SiddhiAppCreationException("There is an error in key management in the uri = " + this.listenerUri + " defined in " + this.sourceEventListener, e8);
        } catch (NoSuchAlgorithmException e9) {
            throw new SiddhiAppCreationException("No such algorithm in the uri = " + this.listenerUri + " defined in " + this.sourceEventListener, e9);
        } catch (TimeoutException e10) {
            throw new SiddhiAppCreationException("Timeout while connectiong with the RabbitMQ server", e10);
        } catch (Exception e11) {
            throw new SiddhiAppCreationException("Error in receiving the message from the RabbitMQ broker in " + this.sourceEventListener, e11);
        }
    }

    public void disconnect() {
        if (this.connection != null) {
            try {
                RabbitMQConsumer.closeChannel();
                this.connection.close();
                if (log.isDebugEnabled()) {
                    log.debug("Server connector for uri = " + this.listenerUri + " is disconnected in " + this.sourceEventListener + ".");
                }
            } catch (IOException e) {
                log.error("Error in disconnecting the uri = " + this.listenerUri + " in " + this.sourceEventListener + ".");
            } catch (TimeoutException e2) {
                log.error("Timeout while disconnecting the uri = " + this.listenerUri + " in " + this.sourceEventListener + ".");
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
        RabbitMQConsumer.pause();
    }

    public void resume() {
        RabbitMQConsumer.resume();
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}
