package org.wso2.extension.siddhi.io.rabbitmq.sink;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.StringRpcServer;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQConstants;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQSinkUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name = "rabbitmq", namespace = "sink", description = "The rabbitmq sink pushes the events into a rabbitmq broker using the AMQP protocol", parameters = {@Parameter(name = RabbitMQConstants.RABBITMQ_SERVER_URI, description = "The URI that used to connect to an AMQP server. If no URI is specified, an error is logged in the CLI.e.g.,\n`amqp://guest:guest`, `amqp://guest:guest@localhost:5672` ", type = {DataType.STRING}), @Parameter(name = RabbitMQConstants.RABBITMQ_HEARTBEAT, description = "The period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries.", type = {DataType.INT}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_HEARTBEAT), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_NAME, description = "The name of the exchange that decides what to do with a message it sends.If the `exchange.name` already exists in the RabbitMQ server, then the system uses that `exchange.name` instead of redeclaring.", type = {DataType.STRING}, dynamic = true), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_TYPE, description = "The type of the exchange.name. The exchange types available are `direct`, `fanout`, `topic` and `headers`. For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html) ", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "direct"), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_DURABLE, description = "If this is set to `true`, the exchange remains declared even if the broker restarts.", type = {DataType.BOOL}, dynamic = true, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_EXCHANGE_AUTO_DELETE, description = "If this is set to `true`, the exchange is automatically deleted when it is not used anymore. ", type = {DataType.BOOL}, dynamic = true, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_DELIVERY_MODE, description = "This determines whether the connection should be persistent or not. The value must be either `1` or `2`.If the delivery.mode = 1, then the connection is not persistent. If the delivery.mode = 2, then the connection is persistent.", type = {DataType.INT}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_DELIVERY_MODE), @Parameter(name = RabbitMQConstants.RABBITMQ_CONTENT_TYPE, description = "The message content type. This should be the `MIME` content type.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONTENT_ENCODING, description = "The message content encoding. The value should be `MIME` content encoding.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_PRIORITY, description = "Specify a value within the range 0 to 9 in this parameter to indicate the message priority.", type = {DataType.INT}, dynamic = true, optional = true, defaultValue = RabbitMQConstants.DEFAULT_PRIORITY), @Parameter(name = RabbitMQConstants.RABBITMQ_CORRELATION_ID, description = "The message correlated to the current message. e.g., The request to which this message is a reply. When a request arrives, a message describing the task is pushed to the queue by the front end server. After that the frontend server blocks to wait for a response message with the same correlation ID. A pool of worker machines listen on queue, and one of them picks up the task, performs it, and returns the result as message. Once a message with right correlation ID arrives, thefront end server continues to return the response to the caller. ", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_REPLY_TO, description = "This is an anonymous exclusive callback queue. When the RabbitMQ receives a message with the `reply.to` property, it sends the response to the mentioned queue. This is commonly used to name a reply queue (or any other identifier that helps a consumer application to direct its response).", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_EXPIRATION, description = "The expiration time after which the message is deleted. The value of the expiration field describes the TTL (Time To Live) period in milliseconds.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_MESSAGE_ID, description = "The message identifier. If applications need to identify messages, it is recommended that they use this attribute instead of putting it into the message payload.", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_TIMESTAMP, description = "Timestamp of the moment when the message was sent. If you do not specify a value for this parameter, the system automatically generates the current date and time as the timestamp value. The format of the timestamp value is `dd/mm/yyyy`.", type = {DataType.STRING}, optional = true, defaultValue = "current timestamp"), @Parameter(name = RabbitMQConstants.RABBITMQ_TYPE, description = "The type of the message. e.g., The type of the event or the command represented by the message.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_USER_ID, description = "The user ID specified here is verified by RabbitMQ against theuser name of the actual connection. This is an optional parameter.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_APP_ID, description = "The identifier of the application that produced the message.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_ROUTINGKEY, description = "The key based on which the excahnge determines how to route the message to the queue. The routing key is similar to an address for the message.", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "empty"), @Parameter(name = "headers", description = "The headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding. ", type = {DataType.STRING}, dynamic = true, optional = true, defaultValue = "null"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_ENABLED, description = "This parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to `true`, the `tls.truststore.path` and `tls.truststore.password` parameters are initialized.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_LOCATION, description = "The file path to the location of the truststore of the client that sends the RabbitMQ events via the `AMQP` protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the `${carbon.home}/resources/security` directory.", type = {DataType.STRING}, optional = true, defaultValue = "${carbon.home}/resources/security/client-truststore.jks"), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_PASSWORD, description = "The password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses `wso2carbon` as the default password.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.TRUSTSTORE_PASSWORD_VALUE), @Parameter(name = "tls.truststore.type", description = "The type of the truststore.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_TLS_TRUSTSTORE_TYPE), @Parameter(name = RabbitMQConstants.RABBITMQ_CONNECTION_TLS_VERSION, description = "The version of the tls/ssl.", type = {DataType.STRING}, optional = true, defaultValue = RabbitMQConstants.DEFAULT_TLS_VERSION)}, examples = {@Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream FooStream (symbol string, price float, volume long); \n@info(name = 'query1')\n@sink(type ='rabbitmq',\nuri = 'amqp://guest:guest@localhost:5672',\nexchange.name = 'direct',\nrouting.key= 'direct',\n@map(type='xml'))\nDefine stream BarStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "This query publishes events to the `direct` exchange with the `direct` exchange type and the `directTest` routing key.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/rabbitmq/sink/RabbitMQSink.class */
public class RabbitMQSink extends Sink {
    private static final Logger log = Logger.getLogger(RabbitMQSink.class);
    private String publisherURI;
    private int heartbeat;
    private Option exchangeNameOption;
    private Option exchangeTypeOption;
    private Option exchangeDurableAsStringOption;
    private Option routingKeyOption;
    private Option headerOption;
    private int deliveryMode;
    private Option exchangeAutoDeleteAsStringOption;
    private String contentType;
    private String contentEncoding;
    private Option messageIdOption;
    private String timestampString;
    private String replyTo;
    private String expiration;
    private Option priorityOption;
    private Option correlationIdOption;
    private String userId;
    private String appId;
    private String type;
    private boolean tlsEnabled;
    private String tlsTruststoreLocation;
    private String tlsTruststoreType;
    private String tlsVersion;
    private String tlsTruststorePassword;
    private StreamDefinition streamDefinition;
    private Connection connection = null;
    private Channel channel = null;
    private FileInputStream fileInputStream = null;

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.publisherURI = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_SERVER_URI);
        this.heartbeat = Integer.parseInt(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_HEARTBEAT, RabbitMQConstants.DEFAULT_HEARTBEAT));
        this.exchangeNameOption = optionHolder.validateAndGetOption(RabbitMQConstants.RABBITMQ_EXCHANGE_NAME);
        this.exchangeTypeOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_EXCHANGE_TYPE, "direct");
        this.tlsTruststoreLocation = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_LOCATION, RabbitMQSinkUtil.getTrustStorePath(configReader));
        this.tlsTruststorePassword = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_PASSWORD, RabbitMQSinkUtil.getTrustStorePassword(configReader));
        this.tlsTruststoreType = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_TRUSTSTORE_TYPE, RabbitMQConstants.DEFAULT_TLS_TRUSTSTORE_TYPE);
        this.tlsVersion = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_VERSION, RabbitMQConstants.DEFAULT_TLS_VERSION);
        this.exchangeDurableAsStringOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_EXCHANGE_DURABLE, "false");
        this.exchangeAutoDeleteAsStringOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_EXCHANGE_AUTO_DELETE, "false");
        this.deliveryMode = Integer.parseInt(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_DELIVERY_MODE, RabbitMQConstants.DEFAULT_DELIVERY_MODE));
        this.contentType = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONTENT_TYPE, RabbitMQConstants.NULL);
        this.contentEncoding = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONTENT_ENCODING, RabbitMQConstants.NULL);
        this.priorityOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_PRIORITY, RabbitMQConstants.DEFAULT_PRIORITY);
        this.correlationIdOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_CORRELATION_ID, RabbitMQConstants.NULL);
        this.messageIdOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_MESSAGE_ID, RabbitMQConstants.NULL);
        this.appId = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_APP_ID, RabbitMQConstants.NULL);
        this.timestampString = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_TIMESTAMP, RabbitMQConstants.NULL);
        this.replyTo = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_REPLY_TO, RabbitMQConstants.NULL);
        this.expiration = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_EXPIRATION, RabbitMQConstants.NULL);
        this.userId = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_USER_ID, RabbitMQConstants.NULL);
        this.type = optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_TYPE, RabbitMQConstants.NULL);
        this.tlsEnabled = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(RabbitMQConstants.RABBITMQ_CONNECTION_TLS_ENABLED, "false"));
        this.routingKeyOption = optionHolder.getOrCreateOption(RabbitMQConstants.RABBITMQ_ROUTINGKEY, "");
        this.headerOption = optionHolder.getOrCreateOption("headers", RabbitMQConstants.NULL);
        if (!RabbitMQConstants.EXCHANGE_TYPE_FANOUT.equals(this.exchangeTypeOption.getValue()) && !"direct".equals(this.exchangeTypeOption.getValue()) && !RabbitMQConstants.EXCHANGE_TYPE_TOPIC.equals(this.exchangeTypeOption.getValue()) && !"headers".equals(this.exchangeTypeOption.getValue())) {
            throw new SiddhiAppCreationException("Check the exchange type in " + this.streamDefinition + ". There is no exchange type named as " + this.exchangeTypeOption.getValue() + " in RabbitMQ");
        }
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(URI.create(this.publisherURI));
            connectionFactory.setRequestedHeartbeat(this.heartbeat);
            if (this.tlsEnabled) {
                try {
                    if (this.tlsTruststoreLocation.isEmpty()) {
                        connectionFactory.useSslProtocol();
                    } else {
                        try {
                            char[] charArray = this.tlsTruststorePassword.toCharArray();
                            KeyStore keyStore = KeyStore.getInstance(this.tlsTruststoreType);
                            this.fileInputStream = new FileInputStream(this.tlsTruststoreLocation);
                            keyStore.load(this.fileInputStream, charArray);
                            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                            trustManagerFactory.init(keyStore);
                            SSLContext sSLContext = SSLContext.getInstance(this.tlsVersion);
                            sSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
                            connectionFactory.useSslProtocol(sSLContext);
                            if (this.fileInputStream != null) {
                                this.fileInputStream.close();
                            }
                        } catch (FileNotFoundException e) {
                            throw new SiddhiAppCreationException("The trustStore File path tls.truststore.path = " + this.tlsTruststoreLocation + " defined in " + this.streamDefinition + " is incorrect. Specify TrustStore location correctly.", e);
                        } catch (IOException e2) {
                            throw new SiddhiAppCreationException("The trustStore type tls.truststore.password = " + this.tlsTruststorePassword + " defined in " + this.streamDefinition + " is incorrect. Specify TrustStore password correctly.", e2);
                        } catch (KeyStoreException e3) {
                            throw new SiddhiAppCreationException("The trustStore type tls.truststore.Type= " + this.tlsTruststoreType + " defined in " + this.streamDefinition + " is incorrect. Specify TrustStore type correctly.", e3);
                        } catch (NoSuchAlgorithmException e4) {
                            throw new SiddhiAppCreationException("Algorithm tls.version = " + this.tlsVersion + " defined in " + this.streamDefinition + "is not available in TrustManagerFactory class.", e4);
                        } catch (CertificateException e5) {
                            throw new SiddhiAppCreationException("TrustStore is not specified in " + this.streamDefinition, e5);
                        }
                    }
                } catch (Throwable th) {
                    if (this.fileInputStream != null) {
                        this.fileInputStream.close();
                    }
                    throw th;
                }
            }
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
        } catch (IOException e6) {
            throw new ConnectionUnavailableException("Failed to connect with the Rabbitmq server. Check the uri = " + this.publisherURI + " defined in " + this.streamDefinition, e6);
        } catch (URISyntaxException e7) {
            throw new SiddhiAppCreationException("There is an invalid syntax in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, e7);
        } catch (KeyManagementException e8) {
            throw new SiddhiAppCreationException("There is an error in key management in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, e8);
        } catch (NoSuchAlgorithmException e9) {
            throw new SiddhiAppCreationException("No such algorithm in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, e9);
        } catch (TimeoutException e10) {
            throw new SiddhiAppCreationException("Timeout while connectiong with the RabbitMQ server", e10);
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    public void publish(Object obj, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            byte[] bytes = obj instanceof byte[] ? (byte[]) obj : obj.toString().getBytes(StringRpcServer.STRING_ENCODING);
            String value = this.exchangeNameOption.getValue(dynamicOptions);
            String value2 = this.exchangeTypeOption.getValue(dynamicOptions);
            String value3 = this.headerOption.getValue(dynamicOptions);
            String value4 = this.messageIdOption.getValue(dynamicOptions);
            int parseInt = Integer.parseInt(this.priorityOption.getValue(dynamicOptions));
            String value5 = this.correlationIdOption.getValue(dynamicOptions);
            Date date = this.timestampString == null ? new Date() : new SimpleDateFormat("dd/MM/yyyy").parse(this.timestampString);
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
            Map<String, Object> map = null;
            if (value3 != null) {
                map = RabbitMQSinkUtil.getHeaders(value3);
            }
            String value6 = this.routingKeyOption.getValue(dynamicOptions);
            AMQP.BasicProperties build = basicProperties.builder().deliveryMode(Integer.valueOf(this.deliveryMode)).contentType(this.contentType).contentEncoding(this.contentEncoding).messageId(value4).replyTo(this.replyTo).expiration(this.expiration).priority(Integer.valueOf(parseInt)).correlationId(value5).userId(this.userId).appId(this.appId).type(this.type).timestamp(date).headers(map).build();
            boolean parseBoolean = Boolean.parseBoolean(this.exchangeDurableAsStringOption.getValue(dynamicOptions));
            boolean parseBoolean2 = Boolean.parseBoolean(this.exchangeAutoDeleteAsStringOption.getValue(dynamicOptions));
            try {
                this.channel.exchangeDeclarePassive(value);
            } catch (Exception e) {
                this.channel = this.connection.createChannel();
                RabbitMQSinkUtil.declareExchange(this.connection, this.channel, value, value2, parseBoolean2, parseBoolean);
            }
            this.channel.basicPublish(value, value6, build, bytes);
        } catch (UnsupportedEncodingException e2) {
            throw new SiddhiAppCreationException("Received payload does not support UTF-8 encoding. Hence dropping the event", e2);
        } catch (IOException e3) {
            log.error("Error in sending the message to the exchange.name = " + this.exchangeNameOption.getValue() + " in RabbitMQ broker at " + this.streamDefinition, e3);
        } catch (ParseException e4) {
            throw new SiddhiAppCreationException("Invalid timestamp format defined in " + this.timestampString + " . Please include as dd/MM/yyyy in " + this.streamDefinition, e4);
        } catch (TimeoutException e5) {
            throw new SiddhiAppCreationException("Timeout while publishing the events to " + this.exchangeNameOption.getValue() + " in RabbitMQ server", e5);
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{RabbitMQConstants.RABBITMQ_EXCHANGE_NAME, RabbitMQConstants.RABBITMQ_EXCHANGE_TYPE, RabbitMQConstants.RABBITMQ_ROUTINGKEY, RabbitMQConstants.RABBITMQ_EXCHANGE_DURABLE, RabbitMQConstants.RABBITMQ_EXCHANGE_AUTO_DELETE, RabbitMQConstants.RABBITMQ_MESSAGE_ID, RabbitMQConstants.RABBITMQ_CORRELATION_ID, RabbitMQConstants.RABBITMQ_PRIORITY};
    }

    public void disconnect() {
        if (this.connection != null) {
            try {
                this.channel.close();
                this.connection.close();
                if (log.isDebugEnabled()) {
                    log.debug("Server connector for uri = " + this.publisherURI + " is disconnected in " + this.streamDefinition + ".");
                }
            } catch (IOException e) {
                log.error("Error in disconnecting the uri = " + this.publisherURI + " in " + this.streamDefinition + ".");
            } catch (TimeoutException e2) {
                log.error("Timeout while disconnecting the uri = " + this.publisherURI + " in " + this.streamDefinition + ".");
            }
        }
    }

    public void destroy() {
    }
}
