/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.jms.sink;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.log4j.Logger;
import org.wso2.carbon.transport.jms.sender.JMSClientConnector;
import org.wso2.extension.siddhi.io.jms.sink.JMSPublisher;
import org.wso2.extension.siddhi.io.jms.util.JMSOptionsMapper;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name="jms", namespace="sink", description="JMS Sink allows users to subscribe to a JMS broker and publish JMS messages.", parameters={@Parameter(name="destination", description="Queue/Topic name which JMS Source should subscribe to", type={DataType.STRING}, dynamic=true), @Parameter(name="connection.factory.jndi.name", description="JMS Connection Factory JNDI name. This value will be used for the JNDI lookup to find the JMS Connection Factory.", type={DataType.STRING}, optional=true, defaultValue="QueueConnectionFactory"), @Parameter(name="factory.initial", description="Naming factory initial value", type={DataType.STRING}), @Parameter(name="provider.url", description="Java naming provider URL. Property for specifying configuration information for the service provider to use. The value of the property should contain a URL string (e.g. \"ldap://somehost:389\")", type={DataType.STRING}), @Parameter(name="connection.factory.type", description="Type of the connection connection factory. This can be either queue or topic.", type={DataType.STRING}, optional=true, defaultValue="queue"), @Parameter(name="connection.username", description="username for the broker.", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="connection.password", description="Password for the broker", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="connection.factory.nature", description="Connection factory nature for the broker(cached/pooled).", type={DataType.STRING}, optional=true, defaultValue="default")}, examples={@Example(description="Following example illustrates how to publish to an ActiveMQ topic.", syntax="@sink(type='jms', @map(type='xml'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost',destination='DAS_JMS_OUTPUT_TEST', connection.factory.type='topic',connection.factory.jndi.name='TopicConnectionFactory')define stream inputStream (name string, age int, country string);"), @Example(description="Following example illustrates how to publish to an ActiveMQ queue. Note that we are not providing properties like connection factory type", syntax="@sink(type='jms', @map(type='xml'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost',destination='DAS_JMS_OUTPUT_TEST')define stream inputStream (name string, age int, country string);")})
public class JMSSink
extends Sink {
    private static final Logger log = Logger.getLogger(JMSSink.class);
    private OptionHolder optionHolder;
    private JMSClientConnector clientConnector;
    private Option destination;
    private Map<String, String> jmsStaticProperties;
    private ExecutorService executorService;

    protected void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext executionPlanContext) {
        this.optionHolder = optionHolder;
        this.destination = optionHolder.getOrCreateOption("destination", null);
        this.jmsStaticProperties = this.initJMSProperties();
        this.executorService = executionPlanContext.getExecutorService();
    }

    public void connect() throws ConnectionUnavailableException {
        this.clientConnector = new JMSClientConnector();
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> state) {
    }

    public void publish(Object payload, DynamicOptions transportOptions) {
        String topicQueueName = this.destination.getValue(transportOptions);
        try {
            this.executorService.submit(new JMSPublisher(topicQueueName, this.jmsStaticProperties, this.clientConnector, payload));
        }
        catch (RejectedExecutionException e) {
            log.error((Object)("Error occured when submitting following payload to be published via JMS. Payload : " + payload.toString()), (Throwable)e);
        }
        catch (UnsupportedEncodingException e) {
            log.error((Object)"Received payload does not support UTF-8 encoding. Hence dropping the event.", (Throwable)e);
        }
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class, ByteBuffer.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"destination"};
    }

    public void disconnect() {
    }

    public void destroy() {
    }

    private Map<String, String> initJMSProperties() {
        List<String> requiredOptions = JMSOptionsMapper.getRequiredOptions();
        Map<String, String> customPropertyMapping = JMSOptionsMapper.getCarbonPropertyMapping();
        HashMap<String, String> transportProperties = new HashMap<String, String>();
        requiredOptions.forEach(requiredOption -> transportProperties.put((String)customPropertyMapping.get(requiredOption), this.optionHolder.validateAndGetStaticValue(requiredOption)));
        this.optionHolder.getStaticOptionsKeys().stream().filter(option -> !requiredOptions.contains(option) && !option.equals("type")).forEach(option -> transportProperties.put((String)customPropertyMapping.get(option), this.optionHolder.validateAndGetStaticValue(option)));
        return transportProperties;
    }
}

