package org.eclipse.tahu.host.seq;

import java.util.Calendar;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.SparkplugParsingException;
import org.eclipse.tahu.host.CommandPublisher;
import org.eclipse.tahu.host.TahuHostCallback;
import org.eclipse.tahu.host.TahuPayloadHandler;
import org.eclipse.tahu.host.api.HostApplicationEventHandler;
import org.eclipse.tahu.host.manager.EdgeNodeManager;
import org.eclipse.tahu.host.manager.SparkplugEdgeNode;
import org.eclipse.tahu.host.model.HostApplicationMetricMap;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/tahu/host/seq/SequenceReorderManager.class */
public class SequenceReorderManager {
    private static Logger logger = LoggerFactory.getLogger(SequenceReorderManager.class.getName());
    private static SequenceReorderManager instance;
    private static final long SEQUENCE_MONITOR_TIMER = 1000;
    private Timer timer;
    private HostApplicationEventHandler eventHandler;
    private CommandPublisher commandPublisher;
    private PayloadDecoder<SparkplugBPayload> payloadDecoder;
    private Long timeout;
    private final Object edgeNodeMapLock = new Object();
    private final Map<EdgeNodeDescriptor, SequenceReorderMap> edgeNodeMap = new ConcurrentHashMap();

    private SequenceReorderManager() {
    }

    public static SequenceReorderManager getInstance() {
        if (instance == null) {
            instance = new SequenceReorderManager();
        }
        return instance;
    }

    public void init(HostApplicationEventHandler hostApplicationEventHandler, CommandPublisher commandPublisher, PayloadDecoder<SparkplugBPayload> payloadDecoder, Long l) {
        if (hostApplicationEventHandler == null || l == null) {
            logger.error("Not re-initializing the SequenceReorderManager timer");
            return;
        }
        instance.eventHandler = hostApplicationEventHandler;
        instance.commandPublisher = commandPublisher;
        instance.payloadDecoder = payloadDecoder;
        instance.timeout = l;
    }

    public void start() {
        TimerTask timerTask = new TimerTask() { // from class: org.eclipse.tahu.host.seq.SequenceReorderManager.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                synchronized (SequenceReorderManager.this.edgeNodeMapLock) {
                    SequenceReorderManager.this.edgeNodeMap.values().forEach(sequenceReorderMap -> {
                        try {
                            if (!sequenceReorderMap.isEmpty()) {
                                Calendar calendar = Calendar.getInstance();
                                calendar.add(14, (int) (SequenceReorderManager.this.timeout.longValue() * (-1)));
                                if (sequenceReorderMap.getLastUpdateTime().before(calendar.getTime())) {
                                    SequenceReorderManager.logger.info("Timeout while reording sequence numbers on {} with {} in queue", sequenceReorderMap.getEdgeNodeDescriptor(), Integer.valueOf(sequenceReorderMap.size()));
                                    SequenceReorderContext expiredSequenceReorderContext = sequenceReorderMap.getExpiredSequenceReorderContext(SequenceReorderManager.this.timeout.longValue());
                                    if (expiredSequenceReorderContext != null) {
                                        TahuPayloadHandler tahuPayloadHandler = new TahuPayloadHandler(SequenceReorderManager.this.eventHandler, SequenceReorderManager.this.commandPublisher, SequenceReorderManager.this.payloadDecoder);
                                        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(sequenceReorderMap.getEdgeNodeDescriptor());
                                        sequenceReorderMap.reset();
                                        if (sparkplugEdgeNode != null) {
                                            SequenceReorderManager.logger.info("Requesting a rebirth from known edge node {}", sequenceReorderMap.getEdgeNodeDescriptor());
                                            sparkplugEdgeNode.setHostAppMqttClientId(expiredSequenceReorderContext.getHostAppMqttClientId());
                                            sparkplugEdgeNode.setMqttServerName(expiredSequenceReorderContext.getMqttServerName());
                                            tahuPayloadHandler.requestRebirth(expiredSequenceReorderContext.getMqttServerName(), expiredSequenceReorderContext.getHostAppMqttClientId(), sequenceReorderMap.getEdgeNodeDescriptor(), sparkplugEdgeNode);
                                        } else {
                                            SequenceReorderManager.logger.info("Requesting a rebirth from unknown edge node {}", sequenceReorderMap.getEdgeNodeDescriptor());
                                            tahuPayloadHandler.requestRebirth(expiredSequenceReorderContext.getMqttServerName(), expiredSequenceReorderContext.getHostAppMqttClientId(), sequenceReorderMap.getEdgeNodeDescriptor());
                                        }
                                    }
                                }
                            }
                        } catch (Exception e) {
                            SequenceReorderManager.logger.error("Failed to handle reorder entry in monitor", e);
                        }
                    });
                }
            }
        };
        this.timer = new Timer("SequenceMonitorTimer");
        this.timer.scheduleAtFixedRate(timerTask, SEQUENCE_MONITOR_TIMER, SEQUENCE_MONITOR_TIMER);
    }

    public void stop() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    public void handlePayload(TahuHostCallback tahuHostCallback, ThreadPoolExecutor threadPoolExecutor, String str, String[] strArr, MqttMessage mqttMessage, MqttServerName mqttServerName, MqttClientId mqttClientId, long j) throws Exception {
        try {
            Topic parseTopic = TopicUtil.parseTopic(strArr);
            MessageType type = parseTopic.getType();
            if (type == MessageType.NCMD || type == MessageType.DCMD) {
                return;
            }
            SparkplugBPayload buildFromByteArray = new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), HostApplicationMetricMap.getInstance().getMetricDataTypeMap(parseTopic.getEdgeNodeDescriptor(), parseTopic.getSparkplugDescriptor()));
            logger.trace("Incoming payload: {}", buildFromByteArray);
            synchronized (this.edgeNodeMapLock) {
                EdgeNodeDescriptor edgeNodeDescriptor = new EdgeNodeDescriptor(parseTopic.getGroupId(), parseTopic.getEdgeNodeId());
                SequenceReorderMap computeIfAbsent = this.edgeNodeMap.computeIfAbsent(edgeNodeDescriptor, edgeNodeDescriptor2 -> {
                    return new SequenceReorderMap(edgeNodeDescriptor);
                });
                if (parseTopic.isType(MessageType.NBIRTH)) {
                    logger.debug("Resetting sequenceReorderMap on NBIRTH for {}", edgeNodeDescriptor);
                    computeIfAbsent.resetSeqNum();
                } else if (parseTopic.isType(MessageType.NDEATH)) {
                    handleMessage(tahuHostCallback, threadPoolExecutor, new SequenceReorderContext(str, parseTopic, mqttMessage, buildFromByteArray, type, mqttServerName, mqttClientId, j));
                    return;
                } else if (parseTopic.isType(MessageType.NCMD) || parseTopic.isType(MessageType.DCMD)) {
                    return;
                }
                boolean z = false;
                if (buildFromByteArray == null || buildFromByteArray.getSeq() == null) {
                    logger.warn("Invalid payload arrived on topic={} with {}", parseTopic, buildFromByteArray == null ? "'payload is null'" : buildFromByteArray.getSeq() == null ? "'payload sequence number is null'" : "sequence number is present - shouldn't have gotten here");
                } else {
                    z = computeIfAbsent.liveSeqNumCheck(buildFromByteArray.getSeq().longValue());
                }
                if (z) {
                    if (parseTopic.isType(MessageType.NBIRTH)) {
                        computeIfAbsent.prune(buildFromByteArray.getTimestamp());
                    }
                    logger.debug("Handling real time message on {} with seqNum={}", str, buildFromByteArray.getSeq());
                    handleMessage(tahuHostCallback, threadPoolExecutor, new SequenceReorderContext(str, parseTopic, mqttMessage, buildFromByteArray, type, mqttServerName, mqttClientId, j));
                    if (!computeIfAbsent.isEmpty()) {
                        boolean z2 = false;
                        long nextSeqNum = getNextSeqNum(buildFromByteArray.getSeq().longValue());
                        while (!z2 && !computeIfAbsent.isEmpty()) {
                            SequenceReorderContext storedSeqNumCheck = computeIfAbsent.storedSeqNumCheck(nextSeqNum);
                            if (storedSeqNumCheck != null) {
                                logger.debug("Handling stored message on {} with seqNum={}", str, Long.valueOf(nextSeqNum));
                                handleMessage(tahuHostCallback, threadPoolExecutor, new SequenceReorderContext(storedSeqNumCheck.getTopicString(), storedSeqNumCheck.getTopic(), storedSeqNumCheck.getMessage(), storedSeqNumCheck.getPayload(), storedSeqNumCheck.getMessageType(), storedSeqNumCheck.getMqttServerName(), storedSeqNumCheck.getHostAppMqttClientId(), storedSeqNumCheck.getArrivedTime()));
                                nextSeqNum = getNextSeqNum(nextSeqNum);
                            } else {
                                logger.debug("Failed to find SequenceReorderContext for {} - moving on", Long.valueOf(nextSeqNum));
                                z2 = true;
                            }
                        }
                    }
                } else {
                    logger.debug("Storing message on {} due to out of sequence message with seqNum={} - was expecting {}", new Object[]{str, buildFromByteArray.getSeq(), Long.valueOf(computeIfAbsent.getNextExpectedSeqNum())});
                    computeIfAbsent.put(buildFromByteArray.getSeq().longValue(), new SequenceReorderContext(str, parseTopic, mqttMessage, buildFromByteArray, type, mqttServerName, mqttClientId, j));
                }
            }
        } catch (SparkplugParsingException e) {
            logger.error("Error parsing topic", e);
        }
    }

    public void removeEdgeNode(EdgeNodeDescriptor edgeNodeDescriptor) {
        synchronized (this.edgeNodeMapLock) {
            this.edgeNodeMap.remove(edgeNodeDescriptor);
        }
    }

    private long getNextSeqNum(long j) {
        long j2 = j + 1;
        if (j2 == 256) {
            j2 = 0;
        }
        return j2;
    }

    private void handleMessage(TahuHostCallback tahuHostCallback, ThreadPoolExecutor threadPoolExecutor, SequenceReorderContext sequenceReorderContext) {
        threadPoolExecutor.execute(() -> {
            try {
                try {
                    new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).handlePayload(sequenceReorderContext.getTopicString(), sequenceReorderContext.getSplitTopic(), sequenceReorderContext.getMessage(), sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId());
                    long nanoTime = System.nanoTime() - sequenceReorderContext.getArrivedTime();
                    if (logger.isTraceEnabled()) {
                        logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime));
                    }
                } catch (Throwable th) {
                    logger.error("Failed to handle Sparkplug B message on topic {} - requesting rebirth", sequenceReorderContext.getTopic(), th);
                    new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).requestRebirth(sequenceReorderContext.getMqttServerName(), sequenceReorderContext.getHostAppMqttClientId(), sequenceReorderContext.getTopic().getEdgeNodeDescriptor());
                    long nanoTime2 = System.nanoTime() - sequenceReorderContext.getArrivedTime();
                    if (logger.isTraceEnabled()) {
                        logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime2));
                    }
                }
            } catch (Throwable th2) {
                long nanoTime3 = System.nanoTime() - sequenceReorderContext.getArrivedTime();
                if (logger.isTraceEnabled()) {
                    logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime3));
                }
                throw th2;
            }
        });
    }
}
