/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.jms.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.jms.source.JMSMessageProcessor;
import io.siddhi.extension.io.jms.source.exception.JMSInputAdaptorRuntimeException;
import io.siddhi.extension.io.jms.util.JMSOptionsMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.transport.jms.contract.JMSServerConnector;
import org.wso2.transport.jms.exception.JMSConnectorException;
import org.wso2.transport.jms.receiver.JMSServerConnectorImpl;

@Extension(name="jms", namespace="source", description="JMS Source allows users to subscribe to a JMS broker and receive JMS messages. It has the ability to receive Map messages and Text messages.", parameters={@Parameter(name="destination", description="Queue/Topic name which JMS Source should subscribe to", type={DataType.STRING}), @Parameter(name="connection.factory.jndi.name", description="JMS Connection Factory JNDI name. This value will be used for the JNDI lookup to find the JMS Connection Factory.", type={DataType.STRING}, optional=true, defaultValue="QueueConnectionFactory"), @Parameter(name="factory.initial", description="Naming factory initial value", type={DataType.STRING}), @Parameter(name="provider.url", description="Java naming provider URL. Property for specifying configuration information for the service provider to use. The value of the property should contain a URL string (e.g. \"ldap://somehost:389\")", type={DataType.STRING}), @Parameter(name="connection.factory.type", description="Type of the connection connection factory. This can be either queue or topic.", type={DataType.STRING}, optional=true, defaultValue="queue"), @Parameter(name="worker.count", description="Number of worker threads listening on the given queue/topic.", type={DataType.INT}, optional=true, defaultValue="1"), @Parameter(name="connection.username", description="username for the broker.", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="connection.password", description="Password for the broker", type={DataType.STRING}, optional=true, defaultValue="None"), @Parameter(name="retry.interval", description="Interval between each retry attempt in case of connection failure in milliseconds.", type={DataType.INT}, optional=true, defaultValue="10000"), @Parameter(name="retry.count", description="Number of maximum reties that will be attempted in case of connection failure with broker.", type={DataType.INT}, optional=true, defaultValue="5"), @Parameter(name="use.receiver", description="Implementation to be used when consuming JMS messages. By default transport will use MessageListener and tweaking this property will make make use of MessageReceiver", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="subscription.durable", description="Property to enable durable subscription.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="connection.factory.nature", description="Connection factory nature for the broker.", type={DataType.STRING}, optional=true, defaultValue="default")}, examples={@Example(description="This example shows how to connect to an ActiveMQ topic and receive messages.", syntax="@source(type='jms', @map(type='json'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='tcp://localhost:61616',destination='DAS_JMS_TEST', connection.factory.type='topic',connection.factory.jndi.name='TopicConnectionFactory')\ndefine stream inputStream (name string, age int, country string);"), @Example(description="This example shows how to connect to an ActiveMQ queue and receive messages. Note that we are not providing properties like connection factory type", syntax="@source(type='jms', @map(type='json'), factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='tcp://localhost:61616',destination='DAS_JMS_TEST' )\ndefine stream inputStream (name string, age int, country string);")})
public class JMSSource
extends Source {
    private static final Logger log = Logger.getLogger(JMSSource.class);
    private SourceEventListener sourceEventListener;
    private OptionHolder optionHolder;
    private JMSServerConnector jmsServerConnector;
    private JMSMessageProcessor jmsMessageProcessor;

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.optionHolder = optionHolder;
        Map<String, String> properties = this.initJMSProperties();
        this.jmsMessageProcessor = new JMSMessageProcessor(sourceEventListener, siddhiAppContext, requestedTransportPropertyNames);
        try {
            this.jmsServerConnector = new JMSServerConnectorImpl(null, properties, this.jmsMessageProcessor);
        }
        catch (JMSConnectorException e) {
            log.error((Object)("Error occurred in initializing the JMS receiver for stream: " + sourceEventListener.getStreamDefinition().getId()));
            throw new JMSInputAdaptorRuntimeException("Error occurred in initializing the JMS receiver for stream: " + sourceEventListener.getStreamDefinition().getId(), e);
        }
        return null;
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        try {
            this.jmsServerConnector.start();
        }
        catch (JMSConnectorException e) {
            throw new ConnectionUnavailableException("Exception in starting the JMS receiver for stream: " + this.sourceEventListener.getStreamDefinition().getId(), (Throwable)e);
        }
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    public void disconnect() {
        try {
            if (this.jmsServerConnector != null) {
                this.jmsServerConnector.stop();
            }
            if (this.jmsMessageProcessor != null) {
                this.jmsMessageProcessor.disconnect();
            }
        }
        catch (JMSConnectorException e) {
            log.error((Object)"Error disconnecting the JMS receiver", (Throwable)e);
        }
    }

    public void destroy() {
    }

    public void pause() {
        this.jmsMessageProcessor.pause();
    }

    public void resume() {
        this.jmsMessageProcessor.resume();
    }

    private Map<String, String> initJMSProperties() {
        Map<String, String> carbonPropertyMapping = JMSOptionsMapper.getCarbonPropertyMapping();
        List<String> requiredOptions = JMSOptionsMapper.getRequiredOptions();
        HashMap<String, String> transportProperties = new HashMap<String, String>();
        requiredOptions.forEach(requiredOption -> transportProperties.put((String)carbonPropertyMapping.get(requiredOption), this.optionHolder.validateAndGetStaticValue(requiredOption)));
        this.optionHolder.getStaticOptionsKeys().stream().filter(option -> !requiredOptions.contains(option) && !option.equals("type")).forEach(option -> transportProperties.put(carbonPropertyMapping.get(option) == null ? option : (String)carbonPropertyMapping.get(option), this.optionHolder.validateAndGetStaticValue(option)));
        return transportProperties;
    }
}

