package acromusashi.stream.component.mqtt.spout;

import acromusashi.stream.constants.FieldName;
import acromusashi.stream.entity.StreamMessage;
import acromusashi.stream.entity.StreamMessageHeader;
import acromusashi.stream.spout.AmConfigurationSpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/component/mqtt/spout/MqttSpout.class */
public class MqttSpout extends AmConfigurationSpout {
    private static final long serialVersionUID = 670832311695197611L;
    private static final Logger logger = LoggerFactory.getLogger(MqttSpout.class);
    private static final QoS DEFAULT_QOS = QoS.AT_LEAST_ONCE;
    private static final long DEFAULT_RECEIVE_WAIT = 10;
    private List<String> brokerUrls;
    private List<List<String>> subscribeTopics;
    private transient BlockingConnection connection;
    private transient Map<String, Message> ackWaitMap;
    private QoS qos = DEFAULT_QOS;
    private boolean immidiateAck = false;
    private long receiveWait = DEFAULT_RECEIVE_WAIT;

    public MqttSpout(List<String> list, List<List<String>> list2) {
        this.brokerUrls = list;
        this.subscribeTopics = list2;
    }

    @Override // acromusashi.stream.spout.AmConfigurationSpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.ackWaitMap = new HashMap();
        int thisTaskIndex = topologyContext.getThisTaskIndex();
        String str = this.brokerUrls.get(thisTaskIndex);
        List<String> list = this.subscribeTopics.get(thisTaskIndex);
        MQTT mqtt = new MQTT();
        try {
            mqtt.setHost(str);
            this.connection = mqtt.blockingConnection();
            this.connection.connect();
            logger.info(MessageFormat.format("MQTT Broker connected. : Url={0}", str));
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                newArrayList.add(new Topic(it.next(), this.qos));
            }
            Topic[] topicArr = (Topic[]) newArrayList.toArray(new Topic[0]);
            this.connection.subscribe(topicArr);
            logger.info(MessageFormat.format("MQTT Broker subscribed. : Topic={0}", Arrays.toString(topicArr)));
        } catch (Exception e) {
            logger.error("MQTT Broker connect failed. Skip initialize Spout.", e);
        }
    }

    public void nextTuple() {
        Message message = null;
        try {
            message = this.connection.receive(this.receiveWait, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.warn("Receive failed. Retry receive.", e);
            this.connection.resume();
        }
        if (message == null) {
            return;
        }
        String topic = message.getTopic();
        StreamMessage createMessage = createMessage(topic, new String(message.getPayload(), Charset.forName("UTF-8")));
        if (this.immidiateAck || this.qos == QoS.AT_MOST_ONCE) {
            message.ack();
            getCollector().emit(new Values(new Object[]{topic, createMessage}));
        } else {
            String uuid = UUID.randomUUID().toString();
            this.ackWaitMap.put(uuid, message);
            getCollector().emit(new Values(new Object[]{topic, createMessage}), uuid);
        }
    }

    public void ack(Object obj) {
        Message remove = this.ackWaitMap.remove(obj);
        if (remove != null) {
            remove.ack();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{FieldName.MESSAGE_KEY, FieldName.MESSAGE_VALUE}));
    }

    protected StreamMessage createMessage(String str, String str2) {
        StreamMessageHeader streamMessageHeader = new StreamMessageHeader();
        streamMessageHeader.setTimestamp(getCurrentTime());
        streamMessageHeader.setType("MQTT");
        streamMessageHeader.setMessageKey(str);
        return new StreamMessage(streamMessageHeader, str2);
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    public void setReceiveWait(long j) {
        this.receiveWait = j;
    }

    public void setQos(QoS qoS) {
        this.qos = qoS;
    }

    public void setImmidiateAck(boolean z) {
        this.immidiateAck = z;
    }
}
