package org.apache.iotdb.db.engine.trigger.sink.forward.mqtt;

import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
import org.apache.iotdb.db.engine.trigger.utils.MQTTConnectionFactory;
import org.apache.iotdb.db.engine.trigger.utils.MQTTConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/trigger/sink/forward/mqtt/MQTTForwardHandler.class */
public class MQTTForwardHandler implements Handler<MQTTForwardConfiguration, MQTTForwardEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTTForwardHandler.class);
    private MQTTConnectionPool connectionPool;
    private MQTTForwardConfiguration config;

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void open(MQTTForwardConfiguration mQTTForwardConfiguration) throws Exception {
        this.config = mQTTForwardConfiguration;
        this.connectionPool = MQTTConnectionPool.getInstance(mQTTForwardConfiguration.getHost(), mQTTForwardConfiguration.getPort(), mQTTForwardConfiguration.getUsername(), new MQTTConnectionFactory(mQTTForwardConfiguration.getHost(), mQTTForwardConfiguration.getPort(), mQTTForwardConfiguration.getUsername(), mQTTForwardConfiguration.getPassword(), mQTTForwardConfiguration.getConnectAttemptsMax(), mQTTForwardConfiguration.getReconnectDelay()), mQTTForwardConfiguration.getPoolSize());
    }

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void close() throws Exception {
        this.connectionPool.clearAndClose();
    }

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void onEvent(MQTTForwardEvent mQTTForwardEvent) throws SinkException {
        try {
            this.connectionPool.publish(this.config.getTopic(), ("[" + mQTTForwardEvent.toJsonString() + "]").getBytes(), this.config.getQos(), this.config.isRetain());
        } catch (Exception e) {
            if (this.config.isStopIfException()) {
                throw new SinkException("MQTT Forward Exception", e);
            }
            LOGGER.error("MQTT Forward Exception", e);
        }
    }

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void onEvent(List<MQTTForwardEvent> list) throws SinkException {
        StringBuilder append = new StringBuilder().append("[");
        Iterator<MQTTForwardEvent> it = list.iterator();
        while (it.hasNext()) {
            append.append(it.next().toJsonString()).append(", ");
        }
        append.replace(append.lastIndexOf(", "), append.length(), "").append("]");
        try {
            this.connectionPool.publish(this.config.getTopic(), append.toString().getBytes(), this.config.getQos(), this.config.isRetain());
        } catch (Exception e) {
            if (this.config.isStopIfException()) {
                throw new SinkException("MQTT Forward Exception", e);
            }
            LOGGER.error("MQTT Forward Exception", e);
        }
    }
}
