package org.apache.rocketmq.broker.coldctr;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.class */
public class ColdDataPullRequestHoldService extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger("RocketmqColdCtr");
    public static final String NO_SUSPEND_KEY = "_noSuspend_";
    private final BrokerController brokerController;
    private final long coldHoldTimeoutMillis = 3000;
    private final SystemClock systemClock = new SystemClock();
    private final LinkedBlockingQueue<PullRequest> pullRequestColdHoldQueue = new LinkedBlockingQueue<>(10000);

    public void suspendColdDataReadRequest(PullRequest pullRequest) {
        if (this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
            this.pullRequestColdHoldQueue.offer(pullRequest);
        }
    }

    public ColdDataPullRequestHoldService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public String getServiceName() {
        return ColdDataPullRequestHoldService.class.getSimpleName();
    }

    public void run() {
        log.info("{} service started", getServiceName());
        while (!isStopped()) {
            try {
                if (this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
                    waitForRunning(5000L);
                } else {
                    waitForRunning(20000L);
                }
                long now = this.systemClock.now();
                checkColdDataPullRequest();
                long now2 = this.systemClock.now() - now;
                log.info("[{}] checkColdDataPullRequest-cost {} ms.", now2 > 5000 ? "NOTIFYME" : "OK", Long.valueOf(now2));
            } catch (Throwable th) {
                log.warn(getServiceName() + " service has exception", th);
            }
        }
        log.info("{} service end", getServiceName());
    }

    private void checkColdDataPullRequest() {
        int i = 0;
        int i2 = 0;
        int size = this.pullRequestColdHoldQueue.size();
        Iterator<PullRequest> it = this.pullRequestColdHoldQueue.iterator();
        while (it.hasNext()) {
            PullRequest next = it.next();
            if (System.currentTimeMillis() >= next.getSuspendTimestamp() + 3000) {
                try {
                    next.getRequestCommand().addExtField(NO_SUSPEND_KEY, "1");
                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(next.getClientChannel(), next.getRequestCommand());
                    i++;
                } catch (Exception e) {
                    log.error("PullRequestColdHoldService checkColdDataPullRequest error", e);
                    i2++;
                }
                it.remove();
            }
        }
        log.info("checkColdPullRequest-info-finish, queueSize: {} successTotal: {} errorTotal: {}", new Object[]{Integer.valueOf(size), Integer.valueOf(i), Integer.valueOf(i2)});
    }
}
