/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.mqtt.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.mqtt.source.MqttConsumer;
import io.siddhi.extension.io.mqtt.util.MqttConstants;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

@Extension(name="mqtt", namespace="source", description="The MQTT source receives the events from an MQTT broker ", parameters={@Parameter(name="url", description="The URL of the MQTT broker. It is used to connect to the MQTT broker. It is required to specify a valid URL here.", type={DataType.STRING}), @Parameter(name="username", description="The username to be provided when the MQTT client is authenticated by the broker.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="password", description="The password to be provided when the MQTT client is authenticated by the broker.", type={DataType.STRING}, optional=true, defaultValue="empty"), @Parameter(name="client.id", description="A unique ID for the MQTT client. The server uses this to identify the client when it reconnects. If you do not specify a client ID, the system automatically generates it.", type={DataType.STRING}), @Parameter(name="topic", description="The topic from which WSO2 SP receives events via MQTT. Multiple topics can be specified as a list of comma separated values.This is a mandatory parameter.", type={DataType.STRING}), @Parameter(name="quality.of.service", description="The quality of service provided by the MQTT client. The possible values are as follows.`0`: The MQTT client sends each event to WSO2 SP only once. It does not receive an acknowledgement when an event is delivered, and the events are not stored.Events may get lost if the MQTT client is disconnected or if the server fails.This is the fastest method in which events are received via MQTT.`1`: The MQTT client sends each event to WSO2 SP at least once. If the MQTT client does not receive an acknowledgement to indicate that the event is delivered, it sends the event again.`2`: The MQTT client sends each event to WSO2 SP only once. The events are stored until the WSO2 SP receives them. This is the safest, but the slowest method of receiving events via MQTT.", type={DataType.STRING}, optional=true, defaultValue="1"), @Parameter(name="clean.session", description="This is an optional paramater. If this parameter is set to `true`, the subscriptions made by the MQTT client during a session expire when the session ends,and they need to be recreated for the next session.\nIf this parameter is set to `false`, all the information relating to the MQTT client's connection to the broker (e.g., the specific topics to which the client has subscribed) are saved after a session. Thus, when a session ends and restarts, the connection is re-established with the same information.\nThe default value is `true`.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="keep.alive", description="The maximum number of seconds the connection between the MQTT client and the broker should be maintained without any events being transferred. Once this time interval elapses without any event transfers, the connection is dropped. The default value is 60.", type={DataType.INT}, optional=true, defaultValue="60"), @Parameter(name="connection.timeout", description="The maximum number of seconds that the MQTT client should spend attempting to connect to the MQTT broker. Once this time interval elapses, a timeout takes place.", type={DataType.INT}, optional=true, defaultValue="30")}, examples={@Example(syntax="@source(type='mqtt', url= 'tcp://localhost:1883', topic='mqtt_topic', clean.session='true',quality.of.service= '1', keep.alive= '60',connection.timeout='30'@map(type='xml'))Define stream BarStream (symbol string, price float, volume long);", description="This query receives events from the `mqtt_topic` topic via MQTT,and processes them to the BarStream stream.")})
public class MqttSource
extends Source {
    private static final Logger log = Logger.getLogger(MqttSource.class);
    private String brokerURL;
    private String topicOption;
    private String clientId;
    private String userName;
    private String userPassword;
    private String qosOption;
    private boolean cleanSession;
    private int keepAlive;
    private int connectionTimeout;
    private MqttClient client;
    private MqttConsumer mqttConsumer;
    private String siddhiAppName;

    public StateFactory init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strings, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppName = siddhiAppContext.getName();
        this.brokerURL = optionHolder.validateAndGetStaticValue("url");
        this.clientId = optionHolder.validateAndGetStaticValue("client.id", "");
        this.topicOption = optionHolder.validateAndGetStaticValue("topic");
        this.userName = optionHolder.validateAndGetStaticValue("username", MqttConstants.DEFAULT_USERNAME);
        this.userPassword = optionHolder.validateAndGetStaticValue("password", "");
        this.qosOption = optionHolder.validateAndGetStaticValue("quality.of.service", "1");
        this.keepAlive = Integer.parseInt(optionHolder.validateAndGetStaticValue("keep.alive", "60"));
        this.connectionTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue("connection.timeout", "30"));
        this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("clean.session", "true"));
        this.mqttConsumer = new MqttConsumer(sourceEventListener);
        return null;
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        try {
            MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence();
            if (this.clientId.equals("")) {
                this.clientId = MqttClient.generateClientId();
            }
            this.client = new MqttClient(this.brokerURL, this.clientId, persistence);
            MqttConnectOptions connectionOptions = new MqttConnectOptions();
            connectionOptions.setUserName(this.userName);
            connectionOptions.setPassword(this.userPassword.toCharArray());
            connectionOptions.setCleanSession(this.cleanSession);
            connectionOptions.setKeepAliveInterval(this.keepAlive);
            connectionOptions.setConnectionTimeout(this.connectionTimeout);
            this.client.connect(connectionOptions);
            int qos = Integer.parseInt(String.valueOf(this.qosOption));
            this.mqttConsumer.subscribe(this.topicOption, qos, this.client);
        }
        catch (MqttException e) {
            throw new ConnectionUnavailableException("Error while connecting with the Mqtt server. Check the url = " + this.brokerURL + " defined in Siddhi App: " + this.siddhiAppName, (Throwable)e);
        }
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void disconnect() {
        try {
            this.client.disconnect();
            log.debug((Object)("Disconnected from MQTT broker: " + this.brokerURL + " defined in Siddhi App: " + this.siddhiAppName));
        }
        catch (MqttException e) {
            log.error((Object)("Could not disconnect from MQTT broker: " + this.brokerURL + " defined in Siddhi App: " + this.siddhiAppName), (Throwable)e);
        }
        finally {
            try {
                this.client.close();
            }
            catch (MqttException e) {
                log.error((Object)("Could not close connection with MQTT broker: " + this.brokerURL + " defined in Siddhi App: " + this.siddhiAppName), (Throwable)e);
            }
        }
    }

    public void destroy() {
    }

    public void pause() {
        this.mqttConsumer.pause();
    }

    public void resume() {
        this.mqttConsumer.resume();
    }
}

