/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.rabbitmq.sink;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQConstants;
import org.wso2.extension.siddhi.io.rabbitmq.util.RabbitMQSinkUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name="rabbitmq", namespace="sink", description="The rabbitmq sink pushes the events into a rabbitmq broker using the AMQP protocol", parameters={@Parameter(name="uri", description="The URI that used to connect to an AMQP server. If no URI is specified, an error is logged in the CLI.e.g.,\n`amqp://guest:guest`, `amqp://guest:guest@localhost:5672` ", type={DataType.STRING}), @Parameter(name="heartbeat", description="The period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries.", type={DataType.INT}, optional=true, defaultValue="60"), @Parameter(name="exchange.name", description="The name of the exchange that decides what to do with a message it sends.If the `exchange.name` already exists in the RabbitMQ server, then the system uses that `exchange.name` instead of redeclaring.", type={DataType.STRING}, dynamic=true), @Parameter(name="exchange.type", description="The type of the exchange.name. The exchange types available are `direct`, `fanout`, `topic` and `headers`. For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html) ", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="direct"), @Parameter(name="exchange.durable.enabled", description="If this is set to `true`, the exchange remains declared even if the broker restarts.", type={DataType.BOOL}, dynamic=true, optional=true, defaultValue="false"), @Parameter(name="exchange.autodelete.enabled", description="If this is set to `true`, the exchange is automatically deleted when it is not used anymore. ", type={DataType.BOOL}, dynamic=true, optional=true, defaultValue="false"), @Parameter(name="delivery.mode", description="This determines whether the connection should be persistent or not. The value must be either `1` or `2`.If the delivery.mode = 1, then the connection is not persistent. If the delivery.mode = 2, then the connection is persistent.", type={DataType.INT}, optional=true, defaultValue="1"), @Parameter(name="content.type", description="The message content type. This should be the `MIME` content type.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="content.encoding", description="The message content encoding. The value should be `MIME` content encoding.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="priority", description="Specify a value within the range 0 to 9 in this parameter to indicate the message priority.", type={DataType.INT}, dynamic=true, optional=true, defaultValue="0"), @Parameter(name="correlation.id", description="The message correlated to the current message. e.g., The request to which this message is a reply. When a request arrives, a message describing the task is pushed to the queue by the front end server. After that the frontend server blocks to wait for a response message with the same correlation ID. A pool of worker machines listen on queue, and one of them picks up the task, performs it, and returns the result as message. Once a message with right correlation ID arrives, thefront end server continues to return the response to the caller. ", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="null"), @Parameter(name="reply.to", description="This is an anonymous exclusive callback queue. When the RabbitMQ receives a message with the `reply.to` property, it sends the response to the mentioned queue. This is commonly used to name a reply queue (or any other identifier that helps a consumer application to direct its response).", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="expiration", description="The expiration time after which the message is deleted. The value of the expiration field describes the TTL (Time To Live) period in milliseconds.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="message.id", description="The message identifier. If applications need to identify messages, it is recommended that they use this attribute instead of putting it into the message payload.", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="null"), @Parameter(name="timestamp", description="Timestamp of the moment when the message was sent. If you do not specify a value for this parameter, the system automatically generates the current date and time as the timestamp value. The format of the timestamp value is `dd/mm/yyyy`.", type={DataType.STRING}, optional=true, defaultValue="current timestamp"), @Parameter(name="type", description="The type of the message. e.g., The type of the event or the command represented by the message.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="user.id", description="The user ID specified here is verified by RabbitMQ against theuser name of the actual connection. This is an optional parameter.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="app.id", description="The identifier of the application that produced the message.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="routing.key", description="The key based on which the excahnge determines how to route the message to the queue. The routing key is similar to an address for the message.", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="empty"), @Parameter(name="headers", description="The headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding. ", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="null"), @Parameter(name="tls.enabled", description="This parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to `true`, the `tls.truststore.path` and `tls.truststore.password` parameters are initialized.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="tls.truststore.path", description="The file path to the location of the truststore of the client that sends the RabbitMQ events via the `AMQP` protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the `${carbon.home}/resources/security` directory.", type={DataType.STRING}, optional=true, defaultValue="${carbon.home}/resources/security/client-truststore.jks"), @Parameter(name="tls.truststore.password", description="The password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses `wso2carbon` as the default password.", type={DataType.STRING}, optional=true, defaultValue="wso2carbon"), @Parameter(name="tls.truststore.type", description="The type of the truststore.", type={DataType.STRING}, optional=true, defaultValue="JKS"), @Parameter(name="tls.version", description="The version of the tls/ssl.", type={DataType.STRING}, optional=true, defaultValue="SSL")}, examples={@Example(syntax="@App:name('TestExecutionPlan') \ndefine stream FooStream (symbol string, price float, volume long); \n@info(name = 'query1')\n@sink(type ='rabbitmq',\nuri = 'amqp://guest:guest@localhost:5672',\nexchange.name = 'direct',\nrouting.key= 'direct',\n@map(type='xml'))\nDefine stream BarStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description="This query publishes events to the `direct` exchange with the `direct` exchange type and the `directTest` routing key.")})
public class RabbitMQSink
extends Sink {
    private static final Logger log = Logger.getLogger(RabbitMQSink.class);
    private String publisherURI;
    private Connection connection = null;
    private Channel channel = null;
    private int heartbeat;
    private Option exchangeNameOption;
    private Option exchangeTypeOption;
    private Option exchangeDurableAsStringOption;
    private Option routingKeyOption;
    private Option headerOption;
    private int deliveryMode;
    private Option exchangeAutoDeleteAsStringOption;
    private String contentType;
    private String contentEncoding;
    private Option messageIdOption;
    private String timestampString;
    private String replyTo;
    private String expiration;
    private Option priorityOption;
    private Option correlationIdOption;
    private String userId;
    private String appId;
    private String type;
    private boolean tlsEnabled;
    private String tlsTruststoreLocation;
    private String tlsTruststoreType;
    private String tlsVersion;
    private String tlsTruststorePassword;
    private StreamDefinition streamDefinition;
    private FileInputStream fileInputStream = null;

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.publisherURI = optionHolder.validateAndGetStaticValue("uri");
        this.heartbeat = Integer.parseInt(optionHolder.validateAndGetStaticValue("heartbeat", "60"));
        this.exchangeNameOption = optionHolder.validateAndGetOption("exchange.name");
        this.exchangeTypeOption = optionHolder.getOrCreateOption("exchange.type", "direct");
        this.tlsTruststoreLocation = optionHolder.validateAndGetStaticValue("tls.truststore.path", RabbitMQSinkUtil.getTrustStorePath(configReader));
        this.tlsTruststorePassword = optionHolder.validateAndGetStaticValue("tls.truststore.password", RabbitMQSinkUtil.getTrustStorePassword(configReader));
        this.tlsTruststoreType = optionHolder.validateAndGetStaticValue("tls.truststore.Type", "JKS");
        this.tlsVersion = optionHolder.validateAndGetStaticValue("tls.version", "SSL");
        this.exchangeDurableAsStringOption = optionHolder.getOrCreateOption("exchange.durable.enabled", "false");
        this.exchangeAutoDeleteAsStringOption = optionHolder.getOrCreateOption("exchange.autodelete.enabled", "false");
        this.deliveryMode = Integer.parseInt(optionHolder.validateAndGetStaticValue("delivery.mode", "1"));
        this.contentType = optionHolder.validateAndGetStaticValue("content.type", RabbitMQConstants.NULL);
        this.contentEncoding = optionHolder.validateAndGetStaticValue("content.encoding", RabbitMQConstants.NULL);
        this.priorityOption = optionHolder.getOrCreateOption("priority", "0");
        this.correlationIdOption = optionHolder.getOrCreateOption("correlation.id", RabbitMQConstants.NULL);
        this.messageIdOption = optionHolder.getOrCreateOption("message.id", RabbitMQConstants.NULL);
        this.appId = optionHolder.validateAndGetStaticValue("app.id", RabbitMQConstants.NULL);
        this.timestampString = optionHolder.validateAndGetStaticValue("timestamp", RabbitMQConstants.NULL);
        this.replyTo = optionHolder.validateAndGetStaticValue("reply.to", RabbitMQConstants.NULL);
        this.expiration = optionHolder.validateAndGetStaticValue("expiration", RabbitMQConstants.NULL);
        this.userId = optionHolder.validateAndGetStaticValue("user.id", RabbitMQConstants.NULL);
        this.type = optionHolder.validateAndGetStaticValue("type", RabbitMQConstants.NULL);
        this.tlsEnabled = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("tls.enabled", "false"));
        this.routingKeyOption = optionHolder.getOrCreateOption("routing.key", "");
        this.headerOption = optionHolder.getOrCreateOption("headers", RabbitMQConstants.NULL);
        if (!("fanout".equals(this.exchangeTypeOption.getValue()) || "direct".equals(this.exchangeTypeOption.getValue()) || "topic".equals(this.exchangeTypeOption.getValue()) || "headers".equals(this.exchangeTypeOption.getValue()))) {
            throw new SiddhiAppCreationException("Check the exchange type in " + this.streamDefinition + ". " + "There is no exchange type named as " + this.exchangeTypeOption.getValue() + " in RabbitMQ");
        }
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            URI uri = URI.create(this.publisherURI);
            factory.setUri(uri);
            factory.setRequestedHeartbeat(this.heartbeat);
            if (this.tlsEnabled) {
                if (this.tlsTruststoreLocation.isEmpty()) {
                    factory.useSslProtocol();
                } else {
                    try {
                        char[] trustStorePassword = this.tlsTruststorePassword.toCharArray();
                        KeyStore keyStore = KeyStore.getInstance(this.tlsTruststoreType);
                        this.fileInputStream = new FileInputStream(this.tlsTruststoreLocation);
                        keyStore.load(this.fileInputStream, trustStorePassword);
                        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                        trustManagerFactory.init(keyStore);
                        SSLContext context = SSLContext.getInstance(this.tlsVersion);
                        context.init(null, trustManagerFactory.getTrustManagers(), null);
                        factory.useSslProtocol(context);
                    }
                    catch (FileNotFoundException e) {
                        throw new SiddhiAppCreationException("The trustStore File path tls.truststore.path = " + this.tlsTruststoreLocation + " defined in " + this.streamDefinition + " is incorrect." + " Specify TrustStore location correctly.", (Throwable)e);
                    }
                    catch (CertificateException e) {
                        throw new SiddhiAppCreationException("TrustStore is not specified in " + this.streamDefinition, (Throwable)e);
                    }
                    catch (NoSuchAlgorithmException e) {
                        throw new SiddhiAppCreationException("Algorithm tls.version = " + this.tlsVersion + " " + "defined in " + this.streamDefinition + "is not available in " + "TrustManagerFactory class.", (Throwable)e);
                    }
                    catch (KeyStoreException e) {
                        throw new SiddhiAppCreationException("The trustStore type tls.truststore.Type= " + this.tlsTruststoreType + " defined in " + this.streamDefinition + " is incorrect." + " Specify TrustStore type correctly.", (Throwable)e);
                    }
                    catch (IOException e) {
                        throw new SiddhiAppCreationException("The trustStore type tls.truststore.password = " + this.tlsTruststorePassword + " defined in " + this.streamDefinition + " is incorrect." + " Specify TrustStore password correctly.", (Throwable)e);
                    }
                    finally {
                        if (this.fileInputStream != null) {
                            this.fileInputStream.close();
                        }
                    }
                }
            }
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
        }
        catch (IOException e) {
            throw new ConnectionUnavailableException("Failed to connect with the Rabbitmq server. Check the uri = " + this.publisherURI + " defined in " + "" + this.streamDefinition, (Throwable)e);
        }
        catch (NoSuchAlgorithmException e) {
            throw new SiddhiAppCreationException("No such algorithm in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, (Throwable)e);
        }
        catch (URISyntaxException e) {
            throw new SiddhiAppCreationException("There is an invalid syntax in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, (Throwable)e);
        }
        catch (TimeoutException e) {
            throw new SiddhiAppCreationException("Timeout while connectiong with the RabbitMQ server", (Throwable)e);
        }
        catch (KeyManagementException e) {
            throw new SiddhiAppCreationException("There is an error in key management in the uri = " + this.publisherURI + " defined in " + this.streamDefinition, (Throwable)e);
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    public void publish(Object payload, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            Date timestamp;
            byte[] byteArray = payload instanceof byte[] ? (byte[])payload : payload.toString().getBytes("UTF-8");
            String exchangeName = this.exchangeNameOption.getValue(dynamicOptions);
            String exchangeType = this.exchangeTypeOption.getValue(dynamicOptions);
            String headers = this.headerOption.getValue(dynamicOptions);
            String messageId = this.messageIdOption.getValue(dynamicOptions);
            int priority = Integer.parseInt(this.priorityOption.getValue(dynamicOptions));
            String correlationId = this.correlationIdOption.getValue(dynamicOptions);
            if (this.timestampString == null) {
                timestamp = new Date();
            } else {
                SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");
                timestamp = formatter.parse(this.timestampString);
            }
            AMQP.BasicProperties props = new AMQP.BasicProperties();
            Map<String, Object> map = null;
            if (headers != null) {
                map = RabbitMQSinkUtil.getHeaders(headers);
            }
            String routingKey = this.routingKeyOption.getValue(dynamicOptions);
            props = props.builder().deliveryMode(this.deliveryMode).contentType(this.contentType).contentEncoding(this.contentEncoding).messageId(messageId).replyTo(this.replyTo).expiration(this.expiration).priority(priority).correlationId(correlationId).userId(this.userId).appId(this.appId).type(this.type).timestamp(timestamp).headers(map).build();
            boolean exchangeAutoDelete = Boolean.parseBoolean(this.exchangeDurableAsStringOption.getValue(dynamicOptions));
            boolean exchangeDurable = Boolean.parseBoolean(this.exchangeAutoDeleteAsStringOption.getValue(dynamicOptions));
            try {
                this.channel.exchangeDeclarePassive(exchangeName);
            }
            catch (Exception e) {
                this.channel = this.connection.createChannel();
                RabbitMQSinkUtil.declareExchange(this.connection, this.channel, exchangeName, exchangeType, exchangeDurable, exchangeAutoDelete);
            }
            this.channel.basicPublish(exchangeName, routingKey, props, byteArray);
        }
        catch (ParseException e) {
            throw new SiddhiAppCreationException("Invalid timestamp format defined in " + this.timestampString + " . " + "Please include as dd/MM/yyyy in " + this.streamDefinition, (Throwable)e);
        }
        catch (UnsupportedEncodingException e) {
            throw new SiddhiAppCreationException("Received payload does not support UTF-8 encoding. Hence dropping the event", (Throwable)e);
        }
        catch (IOException e) {
            log.error((Object)("Error in sending the message to the exchange.name = " + this.exchangeNameOption.getValue() + " in RabbitMQ broker at " + this.streamDefinition), (Throwable)e);
        }
        catch (TimeoutException e) {
            throw new SiddhiAppCreationException("Timeout while publishing the events to " + this.exchangeNameOption.getValue() + " in " + "RabbitMQ server", (Throwable)e);
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"exchange.name", "exchange.type", "routing.key", "exchange.durable.enabled", "exchange.autodelete.enabled", "message.id", "correlation.id", "priority"};
    }

    public void disconnect() {
        if (this.connection != null) {
            try {
                this.channel.close();
                this.connection.close();
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Server connector for uri = " + this.publisherURI + " is disconnected in " + "" + this.streamDefinition + "."));
                }
            }
            catch (TimeoutException e) {
                log.error((Object)("Timeout while disconnecting the uri = " + this.publisherURI + " in " + "" + this.streamDefinition + "."));
            }
            catch (IOException e) {
                log.error((Object)("Error in disconnecting the uri = " + this.publisherURI + " in " + "" + this.streamDefinition + "."));
            }
        }
    }

    public void destroy() {
    }
}

