package org.apache.camel.component.tahu.handlers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.camel.RuntimeCamelException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.SparkplugParsingException;
import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
import org.eclipse.tahu.message.model.DeviceDescriptor;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugDescriptor;
import org.eclipse.tahu.message.model.StatePayload;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.model.MetricDataTypeMap;
import org.eclipse.tahu.mqtt.ClientCallback;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.mqtt.MqttServerUrl;
import org.eclipse.tahu.util.SparkplugUtil;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/tahu/handlers/TahuEdgeClientCallback.class */
public class TahuEdgeClientCallback implements ClientCallback {
    private static final Logger LOG = LoggerFactory.getLogger(TahuEdgeClientCallback.class);
    private TahuEdgeClient client;
    private final EdgeNodeDescriptor edgeNodeDescriptor;
    private final TahuEdgeMetricHandler tahuEdgeNodeMetricHandler;
    private final Marker loggingMarker;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TahuEdgeClientCallback(EdgeNodeDescriptor edgeNodeDescriptor, TahuEdgeMetricHandler tahuEdgeMetricHandler) {
        this.edgeNodeDescriptor = edgeNodeDescriptor;
        this.tahuEdgeNodeMetricHandler = tahuEdgeMetricHandler;
        this.loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setClient(TahuEdgeClient tahuEdgeClient) {
        this.client = tahuEdgeClient;
    }

    public void messageArrived(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, String str, MqttMessage mqttMessage) {
        try {
            Topic parseTopic = TopicUtil.parseTopic(str);
            if (!"spBv1.0".equals(parseTopic.getNamespace())) {
                LOG.warn(this.loggingMarker, "Received message on non-Sparkplug topic: {}", parseTopic);
                return;
            }
            if (parseTopic.isType(MessageType.STATE)) {
                handleSTATEMessage(parseTopic, mqttMessage);
                return;
            }
            if (parseTopic.isType(MessageType.NDEATH) && parseTopic.getEdgeNodeDescriptor().equals(this.edgeNodeDescriptor)) {
                handleNDEATHMessage(parseTopic, mqttMessage);
            } else if (parseTopic.isType(MessageType.NCMD) || parseTopic.isType(MessageType.DCMD)) {
                handleCMDMessage(parseTopic, mqttMessage);
            } else {
                LOG.debug(this.loggingMarker, "Received unexpected Sparkplug message of type {} - ignoring", parseTopic.getType());
            }
        } catch (SparkplugParsingException e) {
            throw new RuntimeCamelException("Exception caught parsing Sparkplug topic " + str, e);
        }
    }

    void handleSTATEMessage(Topic topic, MqttMessage mqttMessage) {
        LOG.debug(this.loggingMarker, "Received STATE message: {} :: {}", topic, new String(mqttMessage.getPayload()));
        try {
            this.client.handleStateMessage(topic.getHostApplicationId(), (StatePayload) new ObjectMapper().readValue(mqttMessage.getPayload(), StatePayload.class));
        } catch (Exception e) {
            throw new RuntimeCamelException("Exception caught handling STATE message with topic " + String.valueOf(topic) + " and payload " + new String(mqttMessage.getPayload()), e);
        }
    }

    void handleNDEATHMessage(Topic topic, MqttMessage mqttMessage) {
        if (this.client.isDisconnectedOrDisconnecting()) {
            LOG.debug(this.loggingMarker, "Received expected LWT for {} - no action required", topic.getEdgeNodeDescriptor());
            return;
        }
        if (!this.client.isConnectedToPrimaryHost()) {
            LOG.debug(this.loggingMarker, "Received unexpected LWT for {} but not connected to primary host - ignoring", this.edgeNodeDescriptor);
            return;
        }
        try {
            long longValue = SparkplugUtil.getBdSequenceNumber(new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), (MetricDataTypeMap) null)).longValue();
            long currentBirthBdSeq = this.tahuEdgeNodeMetricHandler.getCurrentBirthBdSeq();
            if (currentBirthBdSeq == longValue) {
                handleRebirthRequest();
            } else {
                LOG.warn(this.loggingMarker, "Received unexpected LWT for {} with different bdSeq - expected {} received {} - ignoring", new Object[]{this.edgeNodeDescriptor, Long.valueOf(currentBirthBdSeq), Long.valueOf(longValue)});
            }
        } catch (Exception e) {
            throw new RuntimeCamelException("Exception caught handling DEATH message while connected to primary host on topic " + String.valueOf(topic), e);
        }
    }

    void handleRebirthRequest() {
        LOG.warn(this.loggingMarker, "Received unexpected LWT for {} - publishing BIRTH sequence", this.edgeNodeDescriptor);
        try {
            this.client.handleRebirthRequest(true);
        } catch (Exception e) {
            LOG.warn(this.loggingMarker, "Received unexpected LWT but failed to publish new BIRTH sequence for {} - continuing", this.edgeNodeDescriptor, e);
        }
    }

    void handleCMDMessage(Topic topic, MqttMessage mqttMessage) {
        try {
            SparkplugBPayload sparkplugBPayload = (SparkplugBPayload) new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), (MetricDataTypeMap) null);
            if (topic.isType(MessageType.NCMD)) {
                handleNCMDMessage(sparkplugBPayload);
            } else if (topic.isType(MessageType.DCMD)) {
                handleDCMDMessage(sparkplugBPayload, topic.getDeviceId());
            }
        } catch (Exception e) {
            throw new RuntimeCamelException("Exception caught decoding Sparkplug message with topic " + String.valueOf(topic) + " and payload " + new String(mqttMessage.getPayload()), e);
        }
    }

    void handleNCMDMessage(SparkplugBPayload sparkplugBPayload) {
        List<Metric> processCMDMetrics = this.tahuEdgeNodeMetricHandler.processCMDMetrics(sparkplugBPayload, this.edgeNodeDescriptor);
        if (processCMDMetrics.isEmpty()) {
            LOG.warn(this.loggingMarker, "Received NCMD with no valid metrics to write for {} - ignoring", this.edgeNodeDescriptor);
            return;
        }
        SparkplugBPayload createPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().addMetrics(processCMDMetrics).createPayload();
        LOG.debug(this.loggingMarker, "Publishing NDATA based on NCMD message for {}", this.edgeNodeDescriptor);
        this.client.publishNodeData(createPayload);
    }

    void handleDCMDMessage(SparkplugBPayload sparkplugBPayload, String str) {
        SparkplugDescriptor deviceDescriptor = new DeviceDescriptor(this.edgeNodeDescriptor, str);
        List<Metric> processCMDMetrics = this.tahuEdgeNodeMetricHandler.processCMDMetrics(sparkplugBPayload, deviceDescriptor);
        if (processCMDMetrics.isEmpty()) {
            LOG.warn(this.loggingMarker, "Received DCMD with no valid metrics to write for {} - ignoring", deviceDescriptor);
            return;
        }
        SparkplugBPayload createPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().addMetrics(processCMDMetrics).createPayload();
        LOG.debug(this.loggingMarker, "Publishing DDATA based on DCMD message for {}", deviceDescriptor);
        this.client.publishDeviceData(str, createPayload);
    }

    public void shutdown() {
    }

    public void connectionLost(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, Throwable th) {
    }

    public void connectComplete(boolean z, MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId) {
    }
}
