/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.mqtt.sink;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.wso2.extension.siddhi.io.mqtt.sink.exception.MqttSinkRuntimeException;
import org.wso2.extension.siddhi.io.mqtt.util.MqttConstants;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name="mqtt", namespace="sink", description="The MQTT sink publishes the events to an MQTT broker ", parameters={@Parameter(name="url", description="The URL of the MQTT broker. It is used to connect to the MQTT broker It is required to specify a valid URL here.", type={DataType.STRING}), @Parameter(name="username", description="The username to be provided when the MQTT client is authenticated by the broker.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="password", description="The password to be provided when the MQTT client is authenticated by the broker.", type={DataType.STRING}, optional=true, defaultValue="empty"), @Parameter(name="client.id", description="A unique ID for the MQTT client. The server uses this to identify the client when it reconnects. If you do not specify a client ID, the system automatically generates it. ", type={DataType.STRING}, optional=true, defaultValue="generated by the system"), @Parameter(name="topic", description="The topic to which the events processed by WSO2 SP are published via MQTT. This is a mandatory parameter.", type={DataType.STRING}, dynamic=true), @Parameter(name="quality.of.service", description="The quality of service provided by the MQTT client. The possible values are as follows.`0`: The MQTT client sends each event only once. It does not receive an acknowledgement when an event is delivered, and the events are not stored. Events may get lost if the MQTT client is disconnected or if the server fails. This is the fastest method in which events are received via MQTT.`1`: The MQTT client sends each event at least once. If the MQTT client does not receive an acknowledgement to indicate that the event is delivered, it sends the event again.`2`: The MQTT client sends each event only once. The events are stored until the WSO2 SP receives them. This is the safest, but the slowest method of receiving events via MQTT.", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="1"), @Parameter(name="clean.session", description="This is an optional paramater. If this parameter is set to `true`, the subscriptions made by the MQTT client during a session expire when the session ends,and they need to be recreated for the next session.\nIf this parameter is set to `false`, all the information relating to the MQTT client's connection to the broker (e.g., the specific topics to which the client has subscribed) are saved after a session. Thus, when a session ends and restarts, the connection is re-established with the same information.\nThe default value is `true`.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="message.retain", description="If this parameter is set to true, the last message sent from the topic to which WSO2 SP publishes events is retained until the next message is sent.", type={DataType.STRING}, dynamic=true, optional=true, defaultValue="false"), @Parameter(name="keep.alive", description="The maximum number of seconds the connection between the MQTT client and the broker should be maintained without any events being transferred. Once this time interval elapses without any event transfers, the connection is dropped. The default value is 60.", type={DataType.INT}, optional=true, defaultValue="60"), @Parameter(name="connection.timeout", description="The maximum number of seconds that the MQTT client should spend attempting to connect to the MQTT broker. Once this time interval elapses, a timeout takes place.", type={DataType.INT}, optional=true, defaultValue="30")}, examples={@Example(syntax="@sink(type='mqtt', url= 'tcp://localhost:1883', topic='mqtt_topic', clean.session='true', message.retain='false', quality.of.service= '1', keep.alive= '60',connection.timeout='30'@map(type='xml'))Define stream BarStream (symbol string, price float, volume long);", description="This query publishes events to a stream named `BarStream` via the MQTT transport. The events are published to a topic named mqtt_topic located at tcp://localhost:1883.")})
public class MqttSink
extends Sink {
    private static final Logger log = Logger.getLogger(MqttSink.class);
    private String brokerURL;
    private Option topicOption;
    private String clientId;
    private String userName;
    private String userPassword;
    private Option qosOption;
    private boolean cleanSession;
    private int keepAlive;
    private int connectionTimeout;
    private MqttClient client;
    private Option messageRetainOption;
    private StreamDefinition streamDefinition;

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.brokerURL = optionHolder.validateAndGetStaticValue("url");
        this.clientId = optionHolder.validateAndGetStaticValue("client.id", "");
        this.topicOption = optionHolder.validateAndGetOption("topic");
        this.userName = optionHolder.validateAndGetStaticValue("username", MqttConstants.DEFAULT_USERNAME);
        this.userPassword = optionHolder.validateAndGetStaticValue("password", "");
        this.qosOption = optionHolder.getOrCreateOption("quality.of.service", "1");
        this.keepAlive = Integer.parseInt(optionHolder.validateAndGetStaticValue("keep.alive", "60"));
        this.connectionTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue("connection.timeout", "30"));
        this.messageRetainOption = optionHolder.getOrCreateOption("message.retain", "false");
        this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("clean.session", "true"));
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Byte[].class, String.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{"topic", "quality.of.service", "message.retain"};
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence();
            if (this.clientId.isEmpty()) {
                this.clientId = MqttClient.generateClientId();
            }
            this.client = new MqttClient(this.brokerURL, this.clientId, persistence);
            MqttConnectOptions connectionOptions = new MqttConnectOptions();
            connectionOptions.setUserName(this.userName);
            connectionOptions.setPassword(this.userPassword.toCharArray());
            connectionOptions.setCleanSession(this.cleanSession);
            connectionOptions.setKeepAliveInterval(this.keepAlive);
            connectionOptions.setConnectionTimeout(this.connectionTimeout);
            this.client.connect(connectionOptions);
        }
        catch (MqttException e) {
            throw new ConnectionUnavailableException("Error while connecting with the Mqtt server, Check the broker url = " + this.brokerURL + " defined in " + this.streamDefinition.getId(), (Throwable)e);
        }
    }

    public void disconnect() {
        try {
            this.client.disconnect();
            log.debug((Object)("Disconnected from MQTT broker: " + this.brokerURL));
        }
        catch (MqttException e) {
            log.error((Object)("Could not disconnect from MQTT broker: " + this.brokerURL), (Throwable)e);
        }
        finally {
            try {
                this.client.close();
            }
            catch (MqttException e) {
                log.error((Object)("Could not close connection with MQTT broker: " + this.brokerURL), (Throwable)e);
            }
        }
    }

    public void destroy() {
    }

    public void publish(Object payload, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        try {
            int qos;
            MqttMessage message = new MqttMessage();
            byte[] byteArray = payload instanceof byte[] ? (byte[])payload : payload.toString().getBytes("UTF-8");
            message.setPayload(byteArray);
            String qosStr = this.qosOption.getValue(dynamicOptions);
            try {
                qos = Integer.parseInt(qosStr);
            }
            catch (NumberFormatException e) {
                throw new MqttSinkRuntimeException("Invalid QOS value received for MQTT Sink associated to stream '" + this.streamDefinition.getId() + "' . Expected 0, 1 or 2 but received " + qosStr, e);
            }
            if (qos < 0 || qos > 2) {
                throw new MqttSinkRuntimeException("Invalid QOS value received for MQTT Sink associated to stream '" + this.streamDefinition.getId() + "' . Expected 0, 1 or 2 but received " + qos);
            }
            message.setQos(qos);
            boolean messageRetain = Boolean.parseBoolean(this.messageRetainOption.getValue(dynamicOptions));
            message.setRetained(messageRetain);
            String topic = this.topicOption.getValue(dynamicOptions);
            this.client.publish(topic, message);
        }
        catch (MqttException e) {
            log.error((Object)("Error occurred when publishing message to the MQTT broker: " + this.brokerURL + " in " + this.streamDefinition.getId()), (Throwable)e);
        }
        catch (UnsupportedEncodingException e) {
            log.error((Object)("Event could not be encoded in UTF-8, hence it could not be published to MQTT broker: " + this.brokerURL + " in " + this.streamDefinition.getId()), (Throwable)e);
        }
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}

