/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.protocol.mqtt;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.time.ZoneId;
import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishHandler
extends AbstractInterceptHandler {
    private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
    private long sessionId;
    private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
    private final PayloadFormatter payloadFormat;

    public PublishHandler(IoTDBConfig config) {
        this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
    }

    protected PublishHandler(PayloadFormatter payloadFormat) {
        this.payloadFormat = payloadFormat;
    }

    public String getID() {
        return "iotdb-mqtt-broker-listener-" + this.sessionId;
    }

    public void onConnect(InterceptConnectMessage msg) {
        try {
            BasicOpenSessionResp basicOpenSessionResp = this.SESSION_MANAGER.openSession(msg.getUsername(), new String(msg.getPassword()), ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
            this.sessionId = basicOpenSessionResp.getSessionId();
        }
        catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    public void onDisconnect(InterceptDisconnectMessage msg) {
        this.SESSION_MANAGER.closeSession(this.sessionId);
    }

    public void onPublish(InterceptPublishMessage msg) {
        String clientId = msg.getClientID();
        ByteBuf payload = msg.getPayload();
        String topic = msg.getTopicName();
        String username = msg.getUsername();
        MqttQoS qos = msg.getQos();
        LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", new Object[]{clientId, username, qos, topic, payload});
        List<Message> events = this.payloadFormat.format(payload);
        if (events == null) {
            return;
        }
        for (Message event : events) {
            if (event == null) continue;
            boolean status = false;
            try {
                PartialPath path = new PartialPath(event.getDevice());
                InsertRowPlan plan = new InsertRowPlan(path, event.getTimestamp(), event.getMeasurements().toArray(new String[0]), event.getValues().toArray(new String[0]));
                TSStatus tsStatus = this.SESSION_MANAGER.checkAuthority(plan, this.sessionId);
                if (tsStatus != null) {
                    LOG.warn(tsStatus.message);
                } else {
                    status = IoTDB.serviceProvider.executeNonQuery(plan);
                }
            }
            catch (Exception e) {
                LOG.warn("meet error when inserting device {}, measurements {}, at time {}, because ", new Object[]{event.getDevice(), event.getMeasurements(), event.getTimestamp(), e});
            }
            LOG.debug("event process result: {}", (Object)status);
        }
    }
}

