package org.apache.rocketmq.broker.coldctr;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.coldctr.AccAndTimeStamp;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/* loaded from: input_file:org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.class */
public class ColdDataCgCtrService extends ServiceThread {
    private static final String ADAPTIVE = "||adaptive";
    private final BrokerConfig brokerConfig;
    private final MessageStoreConfig messageStoreConfig;
    private final ColdCtrStrategy coldCtrStrategy;
    private static final Logger log = LoggerFactory.getLogger("RocketmqColdCtr");
    private static final AtomicLong GLOBAL_ACC = new AtomicLong(0);
    private final SystemClock systemClock = new SystemClock();
    private final long cgColdAccResideTimeoutMills = 60000;
    private final ConcurrentHashMap<String, AccAndTimeStamp> cgColdThresholdMapRuntime = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Long> cgColdThresholdMapConfig = new ConcurrentHashMap<>();

    public ColdDataCgCtrService(BrokerController brokerController) {
        this.brokerConfig = brokerController.getBrokerConfig();
        this.messageStoreConfig = brokerController.getMessageStoreConfig();
        this.coldCtrStrategy = this.brokerConfig.isUsePIDColdCtrStrategy() ? new PIDAdaptiveColdCtrStrategy(this, Long.valueOf((long) (this.brokerConfig.getGlobalColdReadThreshold() * 0.8d))) : new SimpleColdCtrStrategy(this);
    }

    public String getServiceName() {
        return ColdDataCgCtrService.class.getSimpleName();
    }

    public void run() {
        log.info("{} service started", getServiceName());
        while (!isStopped()) {
            try {
                if (this.messageStoreConfig.isColdDataFlowControlEnable()) {
                    waitForRunning(5000L);
                } else {
                    waitForRunning(180000L);
                }
                long now = this.systemClock.now();
                clearDataAcc();
                if (!this.brokerConfig.isColdCtrStrategyEnable()) {
                    clearAdaptiveConfig();
                }
                long now2 = this.systemClock.now() - now;
                log.info("[{}] clearTheDataAcc-cost {} ms.", now2 > 3000 ? "NOTIFYME" : "OK", Long.valueOf(now2));
            } catch (Throwable th) {
                log.warn(getServiceName() + " service has exception", th);
            }
        }
        log.info("{} service end", getServiceName());
    }

    public String getColdDataFlowCtrInfo() {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("runtimeTable", this.cgColdThresholdMapRuntime);
        jSONObject.put("configTable", this.cgColdThresholdMapConfig);
        jSONObject.put("cgColdReadThreshold", Long.valueOf(this.brokerConfig.getCgColdReadThreshold()));
        jSONObject.put("globalColdReadThreshold", Long.valueOf(this.brokerConfig.getGlobalColdReadThreshold()));
        jSONObject.put("globalAcc", Long.valueOf(GLOBAL_ACC.get()));
        return jSONObject.toJSONString();
    }

    private void clearDataAcc() {
        log.info("clearDataAcc cgColdThresholdMapRuntime key size: {}", Integer.valueOf(this.cgColdThresholdMapRuntime.size()));
        if (this.brokerConfig.isColdCtrStrategyEnable()) {
            this.coldCtrStrategy.collect(Long.valueOf(GLOBAL_ACC.get()));
        }
        Iterator<Map.Entry<String, AccAndTimeStamp>> it = this.cgColdThresholdMapRuntime.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, AccAndTimeStamp> next = it.next();
            if (System.currentTimeMillis() >= 60000 + next.getValue().getLastColdReadTimeMills().longValue()) {
                if (this.brokerConfig.isColdCtrStrategyEnable()) {
                    this.cgColdThresholdMapConfig.remove(buildAdaptiveKey(next.getKey()));
                }
                it.remove();
            } else if (next.getValue().getColdAcc().get() >= getThresholdByConsumerGroup(next.getKey()).longValue()) {
                log.info("Coldctr consumerGroup: {}, acc: {}, threshold: {}", new Object[]{next.getKey(), Long.valueOf(next.getValue().getColdAcc().get()), getThresholdByConsumerGroup(next.getKey())});
                if (this.brokerConfig.isColdCtrStrategyEnable() && !isGlobalColdCtr() && !isAdminConfig(next.getKey())) {
                    this.coldCtrStrategy.promote(buildAdaptiveKey(next.getKey()), getThresholdByConsumerGroup(next.getKey()));
                }
            }
            next.getValue().getColdAcc().set(0L);
        }
        if (isGlobalColdCtr()) {
            log.info("Coldctr global acc: {}, threshold: {}", Long.valueOf(GLOBAL_ACC.get()), Long.valueOf(this.brokerConfig.getGlobalColdReadThreshold()));
        }
        if (this.brokerConfig.isColdCtrStrategyEnable()) {
            sortAndDecelerate();
        }
        GLOBAL_ACC.set(0L);
    }

    private void sortAndDecelerate() {
        ArrayList arrayList = new ArrayList(this.cgColdThresholdMapConfig.entrySet());
        arrayList.sort(new Comparator<Map.Entry<String, Long>>() { // from class: org.apache.rocketmq.broker.coldctr.ColdDataCgCtrService.1
            @Override // java.util.Comparator
            public int compare(Map.Entry<String, Long> entry, Map.Entry<String, Long> entry2) {
                return (int) (entry2.getValue().longValue() - entry.getValue().longValue());
            }
        });
        Iterator it = arrayList.iterator();
        int i = 3;
        while (it.hasNext() && i > 0) {
            Map.Entry entry = (Map.Entry) it.next();
            if (!isAdminConfig((String) entry.getKey())) {
                this.coldCtrStrategy.decelerate((String) entry.getKey(), getThresholdByConsumerGroup((String) entry.getKey()));
                i--;
            }
        }
    }

    public void coldAcc(String str, long j) {
        if (j <= 0) {
            return;
        }
        GLOBAL_ACC.addAndGet(j);
        AccAndTimeStamp accAndTimeStamp = this.cgColdThresholdMapRuntime.get(str);
        if (null == accAndTimeStamp) {
            accAndTimeStamp = this.cgColdThresholdMapRuntime.putIfAbsent(str, new AccAndTimeStamp(new AtomicLong(j)));
        }
        if (null != accAndTimeStamp) {
            accAndTimeStamp.getColdAcc().addAndGet(j);
            accAndTimeStamp.setLastColdReadTimeMills(Long.valueOf(System.currentTimeMillis()));
        }
    }

    public void addOrUpdateGroupConfig(String str, Long l) {
        this.cgColdThresholdMapConfig.put(str, l);
    }

    public void removeGroupConfig(String str) {
        this.cgColdThresholdMapConfig.remove(str);
    }

    public boolean isCgNeedColdDataFlowCtr(String str) {
        AccAndTimeStamp accAndTimeStamp;
        if (!this.messageStoreConfig.isColdDataFlowControlEnable() || MixAll.isSysConsumerGroupForNoColdReadLimit(str) || null == (accAndTimeStamp = this.cgColdThresholdMapRuntime.get(str))) {
            return false;
        }
        return accAndTimeStamp.getColdAcc().get() >= getThresholdByConsumerGroup(str).longValue() || GLOBAL_ACC.get() >= this.brokerConfig.getGlobalColdReadThreshold();
    }

    public boolean isGlobalColdCtr() {
        return GLOBAL_ACC.get() > this.brokerConfig.getGlobalColdReadThreshold();
    }

    public BrokerConfig getBrokerConfig() {
        return this.brokerConfig;
    }

    private Long getThresholdByConsumerGroup(String str) {
        if (isAdminConfig(str)) {
            return str.endsWith(ADAPTIVE) ? this.cgColdThresholdMapConfig.get(str.split(ADAPTIVE)[0]) : this.cgColdThresholdMapConfig.get(str);
        }
        Long l = null;
        if (this.brokerConfig.isColdCtrStrategyEnable()) {
            l = str.endsWith(ADAPTIVE) ? this.cgColdThresholdMapConfig.get(str) : this.cgColdThresholdMapConfig.get(buildAdaptiveKey(str));
        }
        if (null == l) {
            l = Long.valueOf(this.brokerConfig.getCgColdReadThreshold());
        }
        return l;
    }

    private String buildAdaptiveKey(String str) {
        return str + ADAPTIVE;
    }

    private boolean isAdminConfig(String str) {
        if (str.endsWith(ADAPTIVE)) {
            str = str.split(ADAPTIVE)[0];
        }
        return this.cgColdThresholdMapConfig.containsKey(str);
    }

    private void clearAdaptiveConfig() {
        this.cgColdThresholdMapConfig.entrySet().removeIf(entry -> {
            return ((String) entry.getKey()).endsWith(ADAPTIVE);
        });
    }
}
