/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarStats
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(PulsarStats.class);
    private volatile ByteBuf topicStatsBuf;
    private volatile ByteBuf tempTopicStatsBuf;
    private NamespaceStats nsStats;
    private final ClusterReplicationMetrics clusterReplicationMetrics;
    private Map<String, NamespaceBundleStats> bundleStats;
    private List<Metrics> tempMetricsCollection;
    private List<Metrics> metricsCollection;
    private final BrokerOperabilityMetrics brokerOperabilityMetrics;
    private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();

    public PulsarStats(PulsarService pulsar) {
        this.topicStatsBuf = Unpooled.buffer((int)16384);
        this.tempTopicStatsBuf = Unpooled.buffer((int)16384);
        this.nsStats = new NamespaceStats();
        this.clusterReplicationMetrics = new ClusterReplicationMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getConfiguration().isReplicationMetricsEnabled());
        this.bundleStats = Maps.newHashMap();
        this.tempMetricsCollection = Lists.newArrayList();
        this.metricsCollection = Lists.newArrayList();
        this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress());
    }

    @Override
    public void close() {
        this.bufferLock.writeLock().lock();
        try {
            ReferenceCountUtil.safeRelease((Object)this.topicStatsBuf);
            ReferenceCountUtil.safeRelease((Object)this.tempTopicStatsBuf);
        }
        finally {
            this.bufferLock.writeLock().unlock();
        }
    }

    public ClusterReplicationMetrics getClusterReplicationMetrics() {
        return this.clusterReplicationMetrics;
    }

    public synchronized void updateStats(ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> topicsMap) {
        StatsOutputStream topicStatsStream = new StatsOutputStream(this.tempTopicStatsBuf);
        try {
            this.tempMetricsCollection.clear();
            this.bundleStats.clear();
            this.brokerOperabilityMetrics.reset();
            topicStatsStream.startObject();
            topicsMap.forEach((namespaceName, bundles) -> {
                if (bundles.isEmpty()) {
                    return;
                }
                try {
                    topicStatsStream.startObject((String)namespaceName);
                    this.nsStats.reset();
                    bundles.forEach((bundle, topics) -> {
                        NamespaceBundleStats currentBundleStats = this.bundleStats.computeIfAbsent((String)bundle, k -> new NamespaceBundleStats());
                        currentBundleStats.reset();
                        currentBundleStats.topics = topics.size();
                        topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle));
                        topicStatsStream.startObject("persistent");
                        topics.forEach((name, topic) -> {
                            try {
                                topic.updateRates(this.nsStats, currentBundleStats, topicStatsStream, this.clusterReplicationMetrics, (String)namespaceName);
                            }
                            catch (Exception e) {
                                log.error("Failed to generate topic stats for topic {}: {}", new Object[]{name, e.getMessage(), e});
                            }
                            if (topic instanceof PersistentTopic) {
                                ((PersistentTopic)topic).getManagedLedger().checkBackloggedCursors();
                            }
                        });
                        topicStatsStream.endObject();
                        topicStatsStream.endObject();
                    });
                    topicStatsStream.endObject();
                    this.tempMetricsCollection.add(this.nsStats.add((String)namespaceName));
                }
                catch (Exception e) {
                    log.error("Failed to generate namespace stats for namespace {}: {}", new Object[]{namespaceName, e.getMessage(), e});
                }
            });
            if (this.clusterReplicationMetrics.isMetricsEnabled()) {
                this.clusterReplicationMetrics.get().forEach(clusterMetric -> {
                    boolean bl = this.tempMetricsCollection.add((Metrics)clusterMetric);
                });
                this.clusterReplicationMetrics.reset();
            }
            this.brokerOperabilityMetrics.getMetrics().forEach(brokerOperabilityMetric -> {
                boolean bl = this.tempMetricsCollection.add((Metrics)brokerOperabilityMetric);
            });
            topicStatsStream.endObject();
        }
        catch (Exception e) {
            log.error("Unable to update destination stats", (Throwable)e);
        }
        List<Metrics> tempRefMetrics = this.metricsCollection;
        this.metricsCollection = this.tempMetricsCollection;
        this.tempMetricsCollection = tempRefMetrics;
        this.bufferLock.writeLock().lock();
        try {
            ByteBuf tmp = this.topicStatsBuf;
            this.topicStatsBuf = this.tempTopicStatsBuf;
            this.tempTopicStatsBuf = tmp;
            this.tempTopicStatsBuf.clear();
        }
        finally {
            this.bufferLock.writeLock().unlock();
        }
    }

    public NamespaceBundleStats invalidBundleStats(String bundleName) {
        return this.bundleStats.remove(bundleName);
    }

    public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
        this.bufferLock.readLock().lock();
        try {
            consumer.accept(this.topicStatsBuf);
        }
        finally {
            this.bufferLock.readLock().unlock();
        }
    }

    public List<Metrics> getDestinationMetrics() {
        return this.metricsCollection;
    }

    public Map<String, NamespaceBundleStats> getBundleStats() {
        return this.bundleStats;
    }

    public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
        try {
            this.brokerOperabilityMetrics.recordTopicLoadTimeValue(topicLoadLatencyMs);
        }
        catch (Exception ex) {
            log.warn("Exception while recording topic load time for topic {}, {}", (Object)topic, (Object)ex.getMessage());
        }
    }

    public void recordZkLatencyTimeValue(ClientCnxnAspect.EventType eventType, long latencyMs) {
        try {
            if (ClientCnxnAspect.EventType.write.equals((Object)eventType)) {
                this.brokerOperabilityMetrics.recordZkWriteLatencyTimeValue(latencyMs);
            } else if (ClientCnxnAspect.EventType.read.equals((Object)eventType)) {
                this.brokerOperabilityMetrics.recordZkReadLatencyTimeValue(latencyMs);
            }
        }
        catch (Exception ex) {
            log.warn("Exception while recording zk-latency {}, {}", (Object)eventType, (Object)ex.getMessage());
        }
    }
}

