/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.jms.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.jms.sink.JMSPublisher;
import io.siddhi.extension.io.jms.util.JMSOptionsMapper;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
import org.wso2.transport.jms.contract.JMSClientConnector;
import org.wso2.transport.jms.exception.JMSConnectorException;
import org.wso2.transport.jms.impl.JMSConnectorFactoryImpl;

@Extension(name="jms", namespace="sink", description="JMS Sink allows users to subscribe to a JMS broker and publish JMS messages.", parameters={@Parameter(name="destination", description="Queue/Topic name which JMS Source should subscribe to", type={DataType.STRING}, dynamic=true), @Parameter(name="connection.factory.jndi.name", description="JMS Connection Factory JNDI name. This value will be used for the JNDI lookup to find the JMS Connection Factory.", type={DataType.STRING}, optional=true, defaultValue="QueueConnectionFactory"), @Parameter(name="factory.initial", description="Naming factory initial value", type={DataType.STRING}), @Parameter(name="provider.url", description="Java naming provider URL. Property for specifying configuration information for the service provider to use. The value of the property should contain a URL string (e.g. \"ldap://somehost:389\")", type={DataType.STRING}), @Parameter(name="connection.factory.type", description="Type of the connection connection factory. This can be either queue or topic.", type={DataType.STRING}, optional=true, defaultValue="queue"), @Parameter(name="connection.username", description="username for the broker.", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="connection.password", description="Password for the broker", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="connection.factory.nature", description="Connection factory nature for the broker(cached/pooled).", type={DataType.STRING}, optional=true, defaultValue="default")}, examples={@Example(description="This example shows how to publish to an ActiveMQ topic.", syntax="@sink(type='jms', @map(type='xml'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost',destination='DAS_JMS_OUTPUT_TEST', connection.factory.type='topic',connection.factory.jndi.name='TopicConnectionFactory')\ndefine stream inputStream (name string, age int, country string);"), @Example(description="This example shows how to publish to an ActiveMQ queue. Note that we are not providing properties like connection factory type", syntax="@sink(type='jms', @map(type='xml'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost',destination='DAS_JMS_OUTPUT_TEST')\ndefine stream inputStream (name string, age int, country string);")})
public class JMSSink
extends Sink {
    private static final Logger log = Logger.getLogger(JMSSink.class);
    private OptionHolder optionHolder;
    private JMSClientConnector clientConnector;
    private Option destination;
    private Map<String, String> jmsStaticProperties;
    private ExecutorService executorService;

    protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext executionPlanContext) {
        this.optionHolder = optionHolder;
        this.destination = optionHolder.getOrCreateOption("destination", null);
        this.jmsStaticProperties = this.initJMSProperties();
        this.executorService = executionPlanContext.getExecutorService();
        return null;
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            this.clientConnector = new JMSConnectorFactoryImpl().createClientConnector(this.jmsStaticProperties);
        }
        catch (JMSConnectorException e) {
            log.error((Object)("Error while connecting to JMS provider at destination: " + this.destination));
            throw new ConnectionUnavailableException("Error while connecting to JMS provider at destination: " + this.destination, (Throwable)e);
        }
    }

    public void publish(Object payload, DynamicOptions transportOptions, State state) {
        String topicQueueName = this.destination.getValue(transportOptions);
        this.executorService.execute(new JMSPublisher(topicQueueName, this.jmsStaticProperties, this.clientConnector, payload, transportOptions, this));
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class, ByteBuffer.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"destination"};
    }

    public void disconnect() {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public void destroy() {
    }

    private Map<String, String> initJMSProperties() {
        List<String> requiredOptions = JMSOptionsMapper.getRequiredOptions();
        Map<String, String> customPropertyMapping = JMSOptionsMapper.getCarbonPropertyMapping();
        HashMap<String, String> transportProperties = new HashMap<String, String>();
        requiredOptions.forEach(requiredOption -> transportProperties.put((String)customPropertyMapping.get(requiredOption), this.optionHolder.validateAndGetStaticValue(requiredOption)));
        this.optionHolder.getStaticOptionsKeys().stream().filter(option -> !requiredOptions.contains(option) && !option.equals("type")).forEach(option -> transportProperties.put((String)customPropertyMapping.get(option), this.optionHolder.validateAndGetStaticValue(option)));
        return transportProperties;
    }
}

