package org.eclipse.tahu.edge;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.SparkplugInvalidTypeException;
import org.eclipse.tahu.SparkplugParsingException;
import org.eclipse.tahu.edge.api.MetricHandler;
import org.eclipse.tahu.edge.sim.DataSimulator;
import org.eclipse.tahu.edge.sim.RandomDataSimulator;
import org.eclipse.tahu.message.DefaultBdSeqManager;
import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
import org.eclipse.tahu.message.SparkplugBPayloadEncoder;
import org.eclipse.tahu.message.model.DeviceDescriptor;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.MetricDataType;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
import org.eclipse.tahu.message.model.SparkplugDescriptor;
import org.eclipse.tahu.message.model.SparkplugMeta;
import org.eclipse.tahu.message.model.StatePayload;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.model.MetricDataTypeMap;
import org.eclipse.tahu.model.MqttServerDefinition;
import org.eclipse.tahu.mqtt.ClientCallback;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.mqtt.MqttServerUrl;
import org.eclipse.tahu.util.SparkplugUtil;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/tahu/edge/SparkplugEdgeNode.class */
public class SparkplugEdgeNode implements Runnable, MetricHandler, ClientCallback, CommandCallback {
    private static final String COMMAND_LISTENER_DIRECTORY = "/tmp/commands";
    private static final long COMMAND_LISTENER_POLL_RATE = 50;
    private static final String PRIMARY_HOST_ID = "IamHost";
    private static final boolean USE_ALIASES = false;
    private static final String MQTT_CLIENT_ID_1 = "Sparkplug-Tahu-Compatible-Impl-One";
    private static final String USERNAME_1 = "admin";
    private static final String PASSWORD_1 = "changeme";
    private static final String MQTT_CLIENT_ID_2 = "Sparkplug-Tahu-Compatible-Impl-Two";
    private static final String USERNAME_2 = "admin";
    private static final String PASSWORD_2 = "changeme";
    private static final int KEEP_ALIVE_TIMEOUT = 30;
    private CommandListener commandListener;
    private long birthBdSeq;
    private long deathBdSeq;
    private final DataSimulator dataSimulator = new RandomDataSimulator(10, new HashMap<SparkplugDescriptor, Integer>() { // from class: org.eclipse.tahu.edge.SparkplugEdgeNode.1
        private static final long serialVersionUID = 1;

        {
            Iterator it = SparkplugEdgeNode.DEVICE_DESCRIPTORS.iterator();
            while (it.hasNext()) {
                put((DeviceDescriptor) it.next(), 50);
            }
        }
    });
    private Object clientLock = new Object();
    private EdgeClient edgeClient;
    private Thread edgeClientThread;
    private PeriodicPublisher periodicPublisher;
    private DefaultBdSeqManager defaultBdSeqManager;
    private Thread periodicPublisherThread;
    private static Logger logger = LoggerFactory.getLogger(SparkplugEdgeNode.class.getName());
    private static final String GROUP_ID = "G2";
    private static final String EDGE_NODE_ID = "E2";
    private static final EdgeNodeDescriptor EDGE_NODE_DESCRIPTOR = new EdgeNodeDescriptor(GROUP_ID, EDGE_NODE_ID);
    private static final List<String> DEVICE_IDS = Arrays.asList("D2");
    private static final List<DeviceDescriptor> DEVICE_DESCRIPTORS = Arrays.asList(new DeviceDescriptor(EDGE_NODE_DESCRIPTOR, "D2"));
    private static final Long REBIRTH_DEBOUNCE_DELAY = 5000L;
    private static final MqttServerName MQTT_SERVER_NAME_1 = new MqttServerName("Mqtt Server One");
    private static final MqttServerUrl MQTT_SERVER_URL_1 = MqttServerUrl.getMqttServerUrlSafe("tcp://localhost:1883");
    private static final MqttServerName MQTT_SERVER_NAME_2 = new MqttServerName("Mqtt Server Two");
    private static final MqttServerUrl MQTT_SERVER_URL_2 = MqttServerUrl.getMqttServerUrlSafe("tcp://localhost:1884");
    private static final Topic NDEATH_TOPIC = new Topic(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX, GROUP_ID, EDGE_NODE_ID, MessageType.NDEATH);
    private static final List<MqttServerDefinition> mqttServerDefinitions = new ArrayList();

    public static void main(String[] strArr) {
        try {
            mqttServerDefinitions.add(new MqttServerDefinition(MQTT_SERVER_NAME_1, new MqttClientId(MQTT_CLIENT_ID_1, false), MQTT_SERVER_URL_1, "admin", "changeme", 30, NDEATH_TOPIC));
            System.out.println("Starting the Sparkplug Edge Node");
            System.out.println("\tGroup ID: G2");
            System.out.println("\tEdge Node ID: E2");
            System.out.println("\tDevice IDs: " + DEVICE_IDS);
            System.out.println("\tPrimary Host ID: IamHost");
            System.out.println("\tUsing Aliases: false");
            System.out.println("\tRebirth Debounce Delay: " + REBIRTH_DEBOUNCE_DELAY);
            for (MqttServerDefinition mqttServerDefinition : mqttServerDefinitions) {
                System.out.println("\tMQTT Server Name: " + mqttServerDefinition.getMqttServerName());
                System.out.println("\tMQTT Client ID: " + mqttServerDefinition.getMqttClientId());
                System.out.println("\tMQTT Server URL: " + mqttServerDefinition.getMqttServerUrl());
                System.out.println("\tUsername: " + mqttServerDefinition.getUsername());
                System.out.println("\tPassword: ********");
                System.out.println("\tKeep Alive Timeout: " + mqttServerDefinition.getKeepAliveTimeout());
            }
            SparkplugEdgeNode sparkplugEdgeNode = new SparkplugEdgeNode();
            new Thread(sparkplugEdgeNode).start();
            Thread.sleep(360000L);
            sparkplugEdgeNode.shutdown();
        } catch (Exception e) {
            logger.error("Failed to run the Edge Node", (Throwable) e);
        }
    }

    public SparkplugEdgeNode() {
        try {
            this.defaultBdSeqManager = new DefaultBdSeqManager("SparkplugEdgeNode");
            this.deathBdSeq = this.defaultBdSeqManager.getNextDeathBdSeqNum();
            this.birthBdSeq = this.deathBdSeq;
            this.edgeClient = new EdgeClient(this, EDGE_NODE_DESCRIPTOR, DEVICE_IDS, PRIMARY_HOST_ID, false, REBIRTH_DEBOUNCE_DELAY, mqttServerDefinitions, this, null);
        } catch (Exception e) {
            logger.error("Failed to create the Sparkplug Edge Client", (Throwable) e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.commandListener = new CommandListener(this, COMMAND_LISTENER_DIRECTORY, COMMAND_LISTENER_POLL_RATE);
            this.commandListener.start();
            this.edgeClientThread = new Thread(this.edgeClient);
            this.edgeClientThread.start();
        } catch (Exception e) {
            logger.error("Failed to start", (Throwable) e);
        }
    }

    @Override // org.eclipse.tahu.edge.api.MetricHandler
    public Topic getDeathTopic() {
        return NDEATH_TOPIC;
    }

    @Override // org.eclipse.tahu.edge.api.MetricHandler
    public byte[] getDeathPayloadBytes() throws Exception {
        SparkplugBPayload createPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().setTimestamp(new Date()).createPayload();
        addDeathSeqNum(createPayload);
        return new SparkplugBPayloadEncoder().getBytes(createPayload, true);
    }

    @Override // org.eclipse.tahu.edge.api.MetricHandler
    public void publishBirthSequence() {
        try {
            this.edgeClient.publishNodeBirth(addBirthSeqNum(this.dataSimulator.getNodeBirthPayload(EDGE_NODE_DESCRIPTOR)));
            for (String str : DEVICE_IDS) {
                this.edgeClient.publishDeviceBirth(str, this.dataSimulator.getDeviceBirthPayload(new DeviceDescriptor(EDGE_NODE_DESCRIPTOR, str)));
            }
            this.periodicPublisher = new PeriodicPublisher(5000L, this.dataSimulator, this.edgeClient, EDGE_NODE_DESCRIPTOR, DEVICE_DESCRIPTORS);
            this.periodicPublisherThread = new Thread(this.periodicPublisher);
            this.periodicPublisherThread.start();
        } catch (Exception e) {
            logger.error("Failed to publish the BIRTH sequence", (Throwable) e);
        }
    }

    @Override // org.eclipse.tahu.edge.api.MetricHandler
    public boolean hasMetric(SparkplugDescriptor sparkplugDescriptor, String str) {
        return this.dataSimulator.hasMetric(sparkplugDescriptor, str);
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void shutdown() {
        logger.info("ClientCallback shutdown");
        if (this.commandListener != null) {
            this.commandListener.shutdown();
            this.commandListener = null;
        }
        if (this.periodicPublisher != null) {
            this.periodicPublisher.shutdown();
            this.periodicPublisher = null;
        }
        if (this.periodicPublisherThread != null) {
            this.periodicPublisherThread.interrupt();
            this.periodicPublisherThread = null;
        }
        if (this.edgeClient != null) {
            this.edgeClient.shutdown();
            this.edgeClient = null;
            this.edgeClientThread = null;
        }
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void messageArrived(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, String str, MqttMessage mqttMessage) {
        logger.info("{}: ClientCallback messageArrived on topic={}", mqttClientId, str);
        try {
            Topic parseTopic = TopicUtil.parseTopic(str);
            if (str.startsWith("spBv1.0/STATE/")) {
                try {
                    logger.info("Got STATE message: {} :: {}", str, new String(mqttMessage.getPayload()));
                    this.edgeClient.handleStateMessage(parseTopic.getHostApplicationId(), (StatePayload) new ObjectMapper().readValue(mqttMessage.getPayload(), StatePayload.class));
                    return;
                } catch (Exception e) {
                    logger.error("Failed to handle STATE message with topic={} and payload={}", str, new String(mqttMessage.getPayload()));
                    return;
                }
            }
            if (!SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX.equals(TopicUtil.getSplitTopic(str)[0])) {
                logger.warn("Message received on erroneous topic: {}", str);
                return;
            }
            try {
                if (MessageType.NDEATH.equals(parseTopic.getType()) && parseTopic.getGroupId().equals(GROUP_ID) && parseTopic.getEdgeNodeId().equals(EDGE_NODE_ID)) {
                    if (this.edgeClient.isDisconnectedOrDisconnecting()) {
                        logger.debug("Got expected LWT for {}", EDGE_NODE_DESCRIPTOR);
                        return;
                    }
                    if (!this.edgeClient.isConnectedToPrimaryHost()) {
                        logger.debug("Got unexpected LWT but not connected to primary host - ignoring");
                        return;
                    }
                    try {
                        if (this.birthBdSeq == SparkplugUtil.getBdSequenceNumber(new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), (MetricDataTypeMap) null)).longValue()) {
                            logger.info("Got unexpected LWT for {} - publishing BIRTH sequence", EDGE_NODE_DESCRIPTOR);
                            this.edgeClient.handleRebirthRequest(true);
                        }
                    } catch (Exception e2) {
                        logger.warn("Got unexpected LWT but failed to publish a new BIRTH sequence for {}", EDGE_NODE_DESCRIPTOR);
                    }
                    return;
                }
                if (!MessageType.NCMD.equals(parseTopic.getType()) && !MessageType.DCMD.equals(parseTopic.getType())) {
                    logger.debug("Ignoring unexpected incoming Sparkplug message of type {}", parseTopic.getType());
                    return;
                }
                try {
                    logger.debug("Decoding Sparkplug Payload");
                    SparkplugBPayload buildFromByteArray = new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), (MetricDataTypeMap) null);
                    logger.debug("Message Timestamp: {}", buildFromByteArray.getTimestamp());
                    if (!MessageType.NCMD.equals(parseTopic.getType())) {
                        if (MessageType.DCMD.equals(parseTopic.getType())) {
                            try {
                                List<Metric> metrics = buildFromByteArray.getMetrics();
                                ArrayList arrayList = new ArrayList();
                                if (metrics != null && !metrics.isEmpty()) {
                                    Date date = new Date();
                                    SparkplugBPayloadMap.SparkplugBPayloadMapBuilder sparkplugBPayloadMapBuilder = new SparkplugBPayloadMap.SparkplugBPayloadMapBuilder();
                                    sparkplugBPayloadMapBuilder.setTimestamp(date);
                                    for (Metric metric : metrics) {
                                        String name = metric.getName();
                                        logger.debug("Device Metric Name: {}", name);
                                        logger.debug("Metric: {} :: {} :: {}", name, metric.getValue(), metric.getDataType());
                                        Metric handleMetricWrite = this.dataSimulator.handleMetricWrite(new DeviceDescriptor(EDGE_NODE_DESCRIPTOR, parseTopic.getDeviceId()), metric);
                                        if (handleMetricWrite != null) {
                                            arrayList.add(handleMetricWrite);
                                        }
                                    }
                                    if (arrayList.isEmpty()) {
                                        logger.warn("Received DCMD with no valid metrics to write for {}/{}", EDGE_NODE_DESCRIPTOR, parseTopic.getDeviceId());
                                    } else {
                                        logger.debug("Publishing DDATA based on DCMD message for {}/{}", EDGE_NODE_DESCRIPTOR, parseTopic.getDeviceId());
                                        sparkplugBPayloadMapBuilder.addMetrics(arrayList);
                                        this.edgeClient.publishDeviceData(parseTopic.getDeviceId(), sparkplugBPayloadMapBuilder.createPayload());
                                    }
                                }
                                return;
                            } catch (Throwable th) {
                                logger.error("Error parsing DCMD", th);
                                return;
                            }
                        }
                        return;
                    }
                    try {
                        List<Metric> metrics2 = buildFromByteArray.getMetrics();
                        ArrayList arrayList2 = new ArrayList();
                        if (metrics2 != null && !metrics2.isEmpty()) {
                            Date date2 = new Date();
                            SparkplugBPayloadMap.SparkplugBPayloadMapBuilder sparkplugBPayloadMapBuilder2 = new SparkplugBPayloadMap.SparkplugBPayloadMapBuilder();
                            sparkplugBPayloadMapBuilder2.setTimestamp(date2);
                            for (Metric metric2 : metrics2) {
                                String name2 = metric2.getName();
                                logger.debug("Node Metric Name: {}", name2);
                                Object value = metric2.getValue();
                                logger.debug("Metric: {} :: {} :: {}", name2, value, metric2.getDataType());
                                if (SparkplugMeta.METRIC_NODE_REBIRTH.equals(name2) && value.equals(true)) {
                                    this.edgeClient.handleRebirthRequest(true);
                                } else {
                                    Metric handleMetricWrite2 = this.dataSimulator.handleMetricWrite(EDGE_NODE_DESCRIPTOR, metric2);
                                    if (handleMetricWrite2 != null) {
                                        arrayList2.add(handleMetricWrite2);
                                    }
                                }
                            }
                            if (arrayList2.isEmpty()) {
                                logger.warn("Received NCMD with no valid metrics to write for {}", EDGE_NODE_DESCRIPTOR);
                            } else {
                                logger.debug("Publishing NDATA based on NCMD message for {}", EDGE_NODE_DESCRIPTOR);
                                sparkplugBPayloadMapBuilder2.addMetrics(arrayList2);
                                this.edgeClient.publishNodeData(sparkplugBPayloadMapBuilder2.createPayload());
                            }
                        }
                    } catch (Exception e3) {
                        logger.error("Error parsing NCMD", (Throwable) e3);
                    }
                } catch (Exception e4) {
                    logger.error("Failed to parse message - not acting on it", (Throwable) e4);
                }
            } catch (Exception e5) {
                logger.error("Failed to handle NDEATH when connected on {}", parseTopic, e5);
            }
        } catch (SparkplugParsingException e6) {
            logger.error("Error parsing Sparkplug topic {}", str, e6);
        }
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void connectionLost(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, Throwable th) {
        logger.info("{}: ClientCallback connectionLost", mqttClientId);
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void connectComplete(boolean z, MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId) {
        logger.info("{}: ClientCallback connectComplete", mqttClientId);
    }

    @Override // org.eclipse.tahu.edge.CommandCallback
    public void setDeviceOffline(String str) {
        this.edgeClient.publishDeviceDeath(str);
    }

    @Override // org.eclipse.tahu.edge.CommandCallback
    public void setDeviceOnline(String str) {
        this.edgeClient.publishDeviceBirth(str, this.dataSimulator.getDeviceBirthPayload(new DeviceDescriptor(EDGE_NODE_DESCRIPTOR, str)));
    }

    private SparkplugBPayload addDeathSeqNum(SparkplugBPayload sparkplugBPayload) {
        SparkplugBPayload sparkplugBPayload2;
        synchronized (this.clientLock) {
            if (sparkplugBPayload == null) {
                sparkplugBPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().createPayload();
            }
            if (this.deathBdSeq == 256) {
                this.deathBdSeq = 0L;
            }
            logger.trace("Death bdSeq(before) = {}", Long.valueOf(this.deathBdSeq));
            try {
                logger.trace("Set bdSeq number in NDEATH to {}", Long.valueOf(this.deathBdSeq));
                sparkplugBPayload.addMetric(new Metric.MetricBuilder(SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, MetricDataType.Int64, Long.valueOf(this.deathBdSeq)).createMetric());
                this.birthBdSeq = this.deathBdSeq;
                this.deathBdSeq++;
                this.defaultBdSeqManager.storeNextDeathBdSeqNum(this.deathBdSeq);
                logger.trace("Death bdSeq(after) = {}", Long.valueOf(this.deathBdSeq));
                sparkplugBPayload2 = sparkplugBPayload;
            } catch (SparkplugInvalidTypeException e) {
                logger.error("Failed to create death payload", (Throwable) e);
                return null;
            }
        }
        return sparkplugBPayload2;
    }

    private SparkplugBPayloadMap addBirthSeqNum(SparkplugBPayloadMap sparkplugBPayloadMap) {
        SparkplugBPayloadMap sparkplugBPayloadMap2;
        synchronized (this.clientLock) {
            if (sparkplugBPayloadMap == null) {
                sparkplugBPayloadMap = new SparkplugBPayloadMap.SparkplugBPayloadMapBuilder().createPayload();
            }
            logger.trace("Birth bdSeq(before) = {}", Long.valueOf(this.birthBdSeq));
            try {
                logger.trace("Set bdSeq number in NBIRTH to {}", Long.valueOf(this.birthBdSeq));
                sparkplugBPayloadMap.addMetric(new Metric.MetricBuilder(SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, MetricDataType.Int64, Long.valueOf(this.birthBdSeq)).createMetric());
                logger.trace("Birth bdSeq(after) = {}", Long.valueOf(this.birthBdSeq));
                sparkplugBPayloadMap2 = sparkplugBPayloadMap;
            } catch (SparkplugInvalidTypeException e) {
                logger.error("Failed to create birth payload", (Throwable) e);
                return null;
            }
        }
        return sparkplugBPayloadMap2;
    }
}
