/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.AggregatedConsumerStats;
import org.apache.pulsar.broker.stats.prometheus.AggregatedNamespaceStats;
import org.apache.pulsar.broker.stats.prometheus.AggregatedReplicationStats;
import org.apache.pulsar.broker.stats.prometheus.AggregatedSubscriptionStats;
import org.apache.pulsar.broker.stats.prometheus.TopicStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

public class NamespaceStatsAggregator {
    private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>(){

        protected AggregatedNamespaceStats initialValue() throws Exception {
            return new AggregatedNamespaceStats();
        }
    };
    private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>(){

        protected TopicStats initialValue() throws Exception {
            return new TopicStats();
        }
    };

    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
        String cluster = pulsar.getConfiguration().getClusterName();
        AggregatedNamespaceStats namespaceStats = (AggregatedNamespaceStats)localNamespaceStats.get();
        TopicStats topicStats = (TopicStats)localTopicStats.get();
        NamespaceStatsAggregator.printDefaultBrokerStats(stream, cluster);
        LongAdder topicsCount = new LongAdder();
        pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
            namespaceStats.reset();
            bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
                NamespaceStatsAggregator.getTopicStats(topic, topicStats, includeConsumerMetrics);
                if (includeTopicMetrics) {
                    topicsCount.add(1L);
                    TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
                } else {
                    namespaceStats.updateStats(topicStats);
                }
            }));
            if (!includeTopicMetrics) {
                NamespaceStatsAggregator.printNamespaceStats(stream, cluster, namespace, namespaceStats);
            } else {
                NamespaceStatsAggregator.printTopicsCountStats(stream, cluster, namespace, topicsCount);
            }
        });
    }

    private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics) {
        stats.reset();
        if (topic instanceof PersistentTopic) {
            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)((PersistentTopic)topic).getManagedLedger().getStats();
            stats.storageSize = mlStats.getStoredMessagesSize();
            stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
            stats.storageWriteLatencyBuckets.refresh();
            stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
            stats.entrySizeBuckets.refresh();
            stats.storageWriteRate = mlStats.getAddEntryMessagesRate();
            stats.storageReadRate = mlStats.getReadEntriesRate();
        }
        topic.getProducers().forEach(producer -> {
            if (producer.isRemote()) {
                AggregatedReplicationStats replStats = stats.replicationStats.computeIfAbsent(producer.getRemoteCluster(), k -> new AggregatedReplicationStats());
                replStats.msgRateIn += producer.getStats().msgRateIn;
                replStats.msgThroughputIn += producer.getStats().msgThroughputIn;
            } else {
                ++stats.producersCount;
                stats.rateIn += producer.getStats().msgRateIn;
                stats.throughputIn += producer.getStats().msgThroughputIn;
            }
        });
        topic.getSubscriptions().forEach((name, subscription) -> {
            ++stats.subscriptionsCount;
            stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
            AggregatedSubscriptionStats subsStats = stats.subscriptionStats.computeIfAbsent((String)name, k -> new AggregatedSubscriptionStats());
            subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog();
            subscription.getConsumers().forEach(consumer -> {
                if (includeConsumerMetrics) {
                    AggregatedConsumerStats consumerStats = subsStats.consumerStat.computeIfAbsent((Consumer)consumer, k -> new AggregatedConsumerStats());
                    consumerStats.unackedMessages = consumer.getStats().unackedMessages;
                    consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
                    consumerStats.msgRateOut = consumer.getStats().msgRateOut;
                    consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
                    consumerStats.availablePermits = consumer.getStats().availablePermits;
                    consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
                }
                subsStats.unackedMessages += (long)consumer.getStats().unackedMessages;
                subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
                subsStats.msgRateOut += consumer.getStats().msgRateOut;
                subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
                if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
                }
                ++stats.consumersCount;
                stats.rateOut += consumer.getStats().msgRateOut;
                stats.throughputOut += consumer.getStats().msgThroughputOut;
            });
        });
        topic.getReplicators().forEach((cluster, replicator) -> {
            AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent((String)cluster, k -> new AggregatedReplicationStats());
            ReplicatorStats replStats = replicator.getStats();
            aggReplStats.msgRateOut += replStats.msgRateOut;
            aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
            aggReplStats.replicationBacklog += replStats.replicationBacklog;
        });
    }

    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_topics_count", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_subscriptions_count", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_producers_count", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_consumers_count", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_rate_in", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_rate_out", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_throughput_in", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_throughput_out", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_storage_size", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_storage_write_rate", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_storage_read_rate", 0L);
        NamespaceStatsAggregator.metric(stream, cluster, "pulsar_msg_backlog", 0L);
    }

    private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace, LongAdder topicsCount) {
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
    }

    private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, AggregatedNamespaceStats stats) {
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.storageWriteRate);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.storageReadRate);
        NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
        stats.storageWriteLatencyBuckets.refresh();
        long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount());
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum());
        stats.entrySizeBuckets.refresh();
        long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
        NamespaceStatsAggregator.metric(stream, cluster, namespace, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
        if (!stats.replicationStats.isEmpty()) {
            stats.replicationStats.forEach((remoteCluster, replStats) -> {
                NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster, replStats.msgRateIn);
                NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster, replStats.msgRateOut);
                NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster, replStats.msgThroughputIn);
                NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster, replStats.msgThroughputOut);
                NamespaceStatsAggregator.metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster, replStats.replicationBacklog);
            });
        }
    }

    private static void metric(SimpleTextOutputStream stream, String cluster, String name, long value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\"} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, long value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, double value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, String name, String remoteCluster, double value) {
        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
        stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
    }
}

