/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.LoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.stats.BrokerStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerLoadDataReporter
implements LoadDataReporter<BrokerLoadData>,
StateChangeListener {
    private static final Logger log = LoggerFactory.getLogger(BrokerLoadDataReporter.class);
    private static final long TOMBSTONE_DELAY_IN_MILLIS = 10000L;
    private final PulsarService pulsar;
    private final ServiceConfiguration conf;
    private final LoadDataStore<BrokerLoadData> brokerLoadDataStore;
    private final BrokerHostUsage brokerHostUsage;
    private final String brokerId;
    private final BrokerLoadData localData;
    private final BrokerLoadData lastData;
    private volatile long lastTombstonedAt;
    private long tombstoneDelayInMillis;

    public BrokerLoadDataReporter(PulsarService pulsar, String brokerId, LoadDataStore<BrokerLoadData> brokerLoadDataStore) {
        this.brokerLoadDataStore = brokerLoadDataStore;
        this.brokerId = brokerId;
        this.pulsar = pulsar;
        this.conf = this.pulsar.getConfiguration();
        this.brokerHostUsage = SystemUtils.IS_OS_LINUX ? new LinuxBrokerHostUsageImpl(pulsar) : new GenericBrokerHostUsageImpl(pulsar);
        this.localData = new BrokerLoadData();
        this.lastData = new BrokerLoadData();
        this.tombstoneDelayInMillis = 10000L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BrokerLoadData generateLoadData() {
        PulsarStats pulsarStats;
        SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
        PulsarStats pulsarStats2 = pulsarStats = this.pulsar.getBrokerService().getPulsarStats();
        synchronized (pulsarStats2) {
            BrokerStats brokerStats = pulsarStats.getBrokerStats();
            this.localData.update(systemResourceUsage, brokerStats.msgThroughputIn, brokerStats.msgThroughputOut, brokerStats.msgRateIn, brokerStats.msgRateOut, brokerStats.bundleCount, brokerStats.topics, this.pulsar.getConfiguration());
        }
        return this.localData;
    }

    @Override
    public CompletableFuture<Void> reportAsync(boolean force) {
        BrokerLoadData newLoadData = this.generateLoadData();
        boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
        if (force || this.needBrokerDataUpdate()) {
            if (debug) {
                log.info("publishing load report:{}", (Object)this.localData.toString(this.conf));
            }
            CompletableFuture<Void> future = this.brokerLoadDataStore.pushAsync(this.brokerId, newLoadData);
            future.whenComplete((__, ex) -> {
                if (ex == null) {
                    this.localData.setReportedAt(System.currentTimeMillis());
                    this.lastData.update(this.localData);
                } else {
                    log.error("Failed to report the broker load data.", ex);
                }
            });
            return future;
        }
        if (debug) {
            log.info("skipping load report:{}", (Object)this.localData.toString(this.conf));
        }
        return CompletableFuture.completedFuture(null);
    }

    private boolean needBrokerDataUpdate() {
        int loadBalancerReportUpdateMaxIntervalMinutes = this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes();
        int loadBalancerReportUpdateThresholdPercentage = this.conf.getLoadBalancerReportUpdateThresholdPercentage();
        long updateMaxIntervalMillis = TimeUnit.MINUTES.toMillis(loadBalancerReportUpdateMaxIntervalMinutes);
        long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - this.localData.getReportedAt();
        boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
        if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
            if (debug) {
                log.info("Writing local data to metadata store because time since last update exceeded threshold of {} minutes", (Object)loadBalancerReportUpdateMaxIntervalMinutes);
            }
            return true;
        }
        double maxChange = Math.max(100.0 * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(this.percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(this.percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), this.percentChange(this.lastData.getBundleCount(), this.localData.getBundleCount()))));
        if (maxChange > (double)loadBalancerReportUpdateThresholdPercentage) {
            if (debug) {
                log.info(String.format("Writing local data to metadata store because maximum change %.2f%% exceeded threshold %d%%. Time since last report written is %.2f%% seconds", maxChange, loadBalancerReportUpdateThresholdPercentage, (double)timeSinceLastReportWrittenToStore / 1000.0));
            }
            return true;
        }
        return false;
    }

    protected double percentChange(double oldValue, double newValue) {
        if (oldValue == 0.0) {
            if (newValue == 0.0) {
                return 0.0;
            }
            return Double.POSITIVE_INFINITY;
        }
        return 100.0 * Math.abs((oldValue - newValue) / oldValue);
    }

    @VisibleForTesting
    protected void tombstone() {
        long now = System.currentTimeMillis();
        if (now - this.lastTombstonedAt < this.tombstoneDelayInMillis) {
            return;
        }
        long lastSuccessfulTombstonedAt = this.lastTombstonedAt;
        this.lastTombstonedAt = now;
        this.brokerLoadDataStore.removeAsync(this.brokerId).whenComplete((__, e) -> {
            if (e != null) {
                log.error("Failed to clean broker load data.", e);
                this.lastTombstonedAt = lastSuccessfulTombstonedAt;
            } else {
                boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
                if (debug) {
                    log.info("Cleaned broker load data.");
                }
            }
        });
    }

    @Override
    public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
        if (t != null) {
            return;
        }
        ServiceUnitState state = ServiceUnitStateData.state(data);
        switch (state) {
            case Releasing: 
            case Splitting: {
                if (!StringUtils.equals((CharSequence)data.sourceBroker(), (CharSequence)this.brokerId)) break;
                this.localData.clear();
                this.tombstone();
                break;
            }
            case Owned: {
                if (!StringUtils.equals((CharSequence)data.dstBroker(), (CharSequence)this.brokerId)) break;
                this.localData.clear();
                this.tombstone();
            }
        }
    }

    public BrokerLoadData getLocalData() {
        return this.localData;
    }
}

