package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.metrics.BrokerMetricsConstant;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor.class */
public class PopMessageProcessor implements NettyRequestProcessor {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    String reviveTopic;
    private static final String BORN_TIME = "bornTime";
    private final PopLongPollingService popLongPollingService;
    private final PopBufferMergeService popBufferMergeService;
    private final Random random = new Random(System.currentTimeMillis());
    private final QueueLockManager queueLockManager = new QueueLockManager();
    private final AtomicLong ckMessageNumber = new AtomicLong();

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$QueueLockManager.class */
    public class QueueLockManager extends ServiceThread {
        private final ConcurrentHashMap<String, TimedLock> expiredLocalCache = new ConcurrentHashMap<>(100000);

        public QueueLockManager() {
        }

        public String buildLockKey(String str, String str2, int i) {
            return str + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + str2 + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        }

        public boolean tryLock(String str, String str2, int i) {
            return tryLock(buildLockKey(str, str2, i));
        }

        public boolean tryLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock == null) {
                if (this.expiredLocalCache.putIfAbsent(str, new TimedLock()) != null) {
                    return false;
                }
                timedLock = this.expiredLocalCache.get(str);
            }
            if (timedLock == null) {
                return false;
            }
            return timedLock.tryLock();
        }

        public int cleanUnusedLock(long j) {
            Iterator<Map.Entry<String, TimedLock>> it = this.expiredLocalCache.entrySet().iterator();
            int i = 0;
            while (it.hasNext()) {
                Map.Entry<String, TimedLock> next = it.next();
                if (System.currentTimeMillis() - next.getValue().getLockTime() > j) {
                    it.remove();
                    PopMessageProcessor.POP_LOGGER.info("Remove unused queue lock: {}, {}, {}", new Object[]{next.getKey(), Long.valueOf(next.getValue().getLockTime()), Boolean.valueOf(next.getValue().isLock())});
                }
                i++;
            }
            return i;
        }

        public void unLock(String str, String str2, int i) {
            unLock(buildLockKey(str, str2, i));
        }

        public void unLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock != null) {
                timedLock.unLock();
            }
        }

        public String getServiceName() {
            return PopMessageProcessor.this.brokerController.getBrokerConfig().isInBrokerContainer() ? PopMessageProcessor.this.brokerController.getBrokerIdentity().getIdentifier() + QueueLockManager.class.getSimpleName() : QueueLockManager.class.getSimpleName();
        }

        public void run() {
            while (!isStopped()) {
                try {
                    waitForRunning(60000L);
                    PopMessageProcessor.POP_LOGGER.info("QueueLockSize={}", Integer.valueOf(cleanUnusedLock(60000L)));
                } catch (Exception e) {
                    PopMessageProcessor.POP_LOGGER.error("QueueLockManager run error", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$TimedLock.class */
    public static class TimedLock {
        private final AtomicBoolean lock = new AtomicBoolean(true);
        private volatile long lockTime = System.currentTimeMillis();

        public boolean tryLock() {
            if (!this.lock.compareAndSet(true, false)) {
                return false;
            }
            this.lockTime = System.currentTimeMillis();
            return true;
        }

        public void unLock() {
            this.lock.set(true);
        }

        public boolean isLock() {
            return !this.lock.get();
        }

        public long getLockTime() {
            return this.lockTime;
        }
    }

    public PopMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
        this.popLongPollingService = new PopLongPollingService(brokerController, this, false);
        this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this);
    }

    public PopLongPollingService getPopLongPollingService() {
        return this.popLongPollingService;
    }

    public PopBufferMergeService getPopBufferMergeService() {
        return this.popBufferMergeService;
    }

    public QueueLockManager getQueueLockManager() {
        return this.queueLockManager;
    }

    public static String genAckUniqueId(AckMsg ackMsg) {
        return ackMsg.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getQueueId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getAckOffset() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getConsumerGroup() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getPopTime() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getBrokerName() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + "ack";
    }

    public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
        return batchAckMsg.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + batchAckMsg.getQueueId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + batchAckMsg.getAckOffsetList().toString() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + batchAckMsg.getConsumerGroup() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + batchAckMsg.getPopTime() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + "bAck";
    }

    public static String genCkUniqueId(PopCheckPoint popCheckPoint) {
        return popCheckPoint.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getQueueId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getStartOffset() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getCId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getPopTime() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getBrokerName() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + "ck";
    }

    public boolean rejectRequest() {
        return false;
    }

    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
        return this.popLongPollingService.getPollingMap();
    }

    public void notifyLongPollingRequestIfNeed(String str, String str2, int i) {
        notifyLongPollingRequestIfNeed(str, str2, i, null, 0L, null, null);
    }

    public void notifyLongPollingRequestIfNeed(String str, String str2, int i, Long l, long j, byte[] bArr, Map<String, String> map) {
        if (this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) > Math.max(this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(str, str2, i), this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i))) {
            boolean notifyMessageArriving = this.popLongPollingService.notifyMessageArriving(str, -1, str2, l, j, bArr, map);
            if (!notifyMessageArriving) {
                notifyMessageArriving = this.popLongPollingService.notifyMessageArriving(str, i, str2, l, j, bArr, map);
            }
            this.brokerController.getNotificationProcessor().notifyMessageArriving(str, i);
            if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}", new Object[]{str, str2, Integer.valueOf(i), Boolean.valueOf(notifyMessageArriving)});
            }
        }
    }

    public void notifyMessageArriving(String str, int i, Long l, long j, byte[] bArr, Map<String, String> map) {
        this.popLongPollingService.notifyMessageArrivingWithRetryTopic(str, i, l, j, bArr, map);
    }

    public void notifyMessageArriving(String str, int i, String str2) {
        this.popLongPollingService.notifyMessageArriving(str, i, str2, null, 0L, null, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v124, types: [java.util.concurrent.CompletableFuture] */
    /* JADX WARN: Type inference failed for: r1v39, types: [org.apache.rocketmq.remoting.protocol.RemotingCommand, java.util.function.Function] */
    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        CompletableFuture thenCompose;
        long now = this.brokerController.getMessageStore().now();
        remotingCommand.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        if (Objects.equals(remotingCommand.getExtFields().get(BORN_TIME), "0")) {
            remotingCommand.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        }
        Channel channel = channelHandlerContext.channel();
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
        PopMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PopMessageRequestHeader popMessageRequestHeader = (PopMessageRequestHeader) remotingCommand.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
        StringBuilder sb = new StringBuilder(64);
        StringBuilder sb2 = new StringBuilder(64);
        StringBuilder sb3 = null;
        if (popMessageRequestHeader.isOrder()) {
            sb3 = new StringBuilder(64);
        }
        this.brokerController.getConsumerManager().compensateBasicConsumerInfo(popMessageRequestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("receive PopMessage request command, {}", remotingCommand);
        }
        if (popMessageRequestHeader.isTimeoutTooMuch()) {
            createResponseCommand.setCode(210);
            createResponseCommand.setRemark(String.format("the broker[%s] pop message is timeout too much", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] pop message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (popMessageRequestHeader.getMaxMsgNums() > 32) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(String.format("the broker[%s] pop message's num is greater than 32", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (!this.brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(popMessageRequestHeader.getTopic());
        if (null == selectTopicConfig) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", popMessageRequestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", popMessageRequestHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + popMessageRequestHeader.getTopic() + "] peeking message is forbidden");
            return createResponseCommand;
        }
        if (popMessageRequestHeader.getQueueId().intValue() >= selectTopicConfig.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", popMessageRequestHeader.getQueueId(), popMessageRequestHeader.getTopic(), Integer.valueOf(selectTopicConfig.getReadQueueNums()), channel.remoteAddress());
            POP_LOGGER.warn(format);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popMessageRequestHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", popMessageRequestHeader.getConsumerGroup(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + popMessageRequestHeader.getConsumerGroup());
            return createResponseCommand;
        }
        BrokerConfig brokerConfig = this.brokerController.getBrokerConfig();
        SubscriptionData subscriptionData = null;
        ExpressionMessageFilter expressionMessageFilter = null;
        if (popMessageRequestHeader.getExp() == null || popMessageRequestHeader.getExp().isEmpty()) {
            try {
                subscriptionData = FilterAPI.build(popMessageRequestHeader.getTopic(), "*", "TAG");
                this.brokerController.getConsumerManager().compensateSubscribeData(popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), subscriptionData);
                String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                this.brokerController.getConsumerManager().compensateSubscribeData(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, FilterAPI.build(buildPopRetryTopic, "*", "TAG"));
            } catch (Exception e) {
                POP_LOGGER.warn("Build default subscription error, group: {}", popMessageRequestHeader.getConsumerGroup());
            }
        } else {
            try {
                subscriptionData = FilterAPI.build(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getExp(), popMessageRequestHeader.getExpType());
                this.brokerController.getConsumerManager().compensateSubscribeData(popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), subscriptionData);
                String buildPopRetryTopic2 = KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                this.brokerController.getConsumerManager().compensateSubscribeData(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic2, FilterAPI.build(buildPopRetryTopic2, "*", popMessageRequestHeader.getExpType()));
                ConsumerFilterData consumerFilterData = null;
                if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getExp(), popMessageRequestHeader.getExpType(), System.currentTimeMillis());
                    if (consumerFilterData == null) {
                        POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", popMessageRequestHeader.getExp(), popMessageRequestHeader.getConsumerGroup());
                        createResponseCommand.setCode(23);
                        createResponseCommand.setRemark("parse the consumer's subscription failed");
                        return createResponseCommand;
                    }
                }
                expressionMessageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
            } catch (Exception e2) {
                POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", popMessageRequestHeader.getExp(), popMessageRequestHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        }
        int nextInt = this.random.nextInt(100);
        int abs = popMessageRequestHeader.isOrder() ? 999 : (int) Math.abs(this.ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
        GetMessageResult getMessageResult = new GetMessageResult(popMessageRequestHeader.getMaxMsgNums());
        ExpressionMessageFilter expressionMessageFilter2 = expressionMessageFilter;
        StringBuilder sb4 = sb3;
        boolean z = nextInt % 5 == 0;
        boolean z2 = false;
        if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
            z2 = nextInt % 2 == 0;
        }
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture<Long> completedFuture = CompletableFuture.completedFuture(0L);
        if (z && !popMessageRequestHeader.isOrder()) {
            completedFuture = z2 ? popMsgFromTopic(KeyBuilder.buildPopRetryTopicV1(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()), true, getMessageResult, popMessageRequestHeader, abs, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb3, nextInt, completedFuture) : popMsgFromTopic(KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()), true, getMessageResult, popMessageRequestHeader, abs, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb3, nextInt, completedFuture);
        }
        if (popMessageRequestHeader.getQueueId().intValue() < 0) {
            thenCompose = popMsgFromTopic(selectTopicConfig, false, getMessageResult, popMessageRequestHeader, abs, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb3, nextInt, completedFuture);
        } else {
            int intValue = popMessageRequestHeader.getQueueId().intValue();
            int i = abs;
            thenCompose = completedFuture.thenCompose((Function<? super Long, ? extends CompletionStage<U>>) l -> {
                return popMsgFromQueue(selectTopicConfig.getTopicName(), popMessageRequestHeader.getAttemptId(), false, getMessageResult, popMessageRequestHeader, intValue, l.longValue(), i, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb4);
            });
        }
        if (!z && getMessageResult.getMessageMapedList().size() < popMessageRequestHeader.getMaxMsgNums() && !popMessageRequestHeader.isOrder()) {
            thenCompose = z2 ? popMsgFromTopic(KeyBuilder.buildPopRetryTopicV1(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()), true, getMessageResult, popMessageRequestHeader, abs, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb3, nextInt, (CompletableFuture<Long>) thenCompose) : popMsgFromTopic(KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()), true, getMessageResult, popMessageRequestHeader, abs, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb3, nextInt, (CompletableFuture<Long>) thenCompose);
        }
        SubscriptionData subscriptionData2 = subscriptionData;
        int i2 = abs;
        thenCompose.thenApply((Function) l2 -> {
            if (getMessageResult.getMessageBufferList().isEmpty()) {
                PollingResult polling = this.popLongPollingService.polling(channelHandlerContext, remotingCommand, new PollingHeader(popMessageRequestHeader), subscriptionData2, expressionMessageFilter2);
                if (PollingResult.POLLING_SUC == polling) {
                    return null;
                }
                if (PollingResult.POLLING_FULL == polling) {
                    createResponseCommand.setCode(209);
                } else {
                    createResponseCommand.setCode(210);
                }
                getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
            } else {
                createResponseCommand.setCode(0);
                getMessageResult.setStatus(GetMessageStatus.FOUND);
                if (l2.longValue() > 0) {
                    this.popLongPollingService.notifyMessageArriving(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getQueueId().intValue(), popMessageRequestHeader.getConsumerGroup(), null, 0L, null, null);
                }
            }
            readCustomHeader.setInvisibleTime(popMessageRequestHeader.getInvisibleTime());
            readCustomHeader.setPopTime(currentTimeMillis);
            readCustomHeader.setReviveQid(i2);
            readCustomHeader.setRestNum(l2.longValue());
            readCustomHeader.setStartOffsetInfo(sb.toString());
            readCustomHeader.setMsgOffsetInfo(sb2.toString());
            if (popMessageRequestHeader.isOrder() && sb4 != null) {
                readCustomHeader.setOrderCountInfo(sb4.toString());
            }
            createResponseCommand.setRemark(getMessageResult.getStatus().name());
            switch (createResponseCommand.getCode()) {
                case 0:
                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                        byte[] readGetMessageResult = readGetMessageResult(getMessageResult, popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), popMessageRequestHeader.getQueueId().intValue());
                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), popMessageRequestHeader.getQueueId().intValue(), (int) (this.brokerController.getMessageStore().now() - now));
                        createResponseCommand.setBody(readGetMessageResult);
                        return createResponseCommand;
                    }
                    try {
                        channel.writeAndFlush(new ManyMessageTransfer(createResponseCommand.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult)).addListener(channelFuture -> {
                            getMessageResult.release();
                            RemotingMetricsManager.rpcLatency.record(remotingCommand.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), RemotingMetricsManager.newAttributesBuilder().put("request_code", RemotingHelper.getRequestCodeDesc(remotingCommand.getCode())).put("response_code", RemotingHelper.getResponseCodeDesc(createResponseCommand.getCode())).put("result", RemotingMetricsManager.getWriteAndFlushResult(channelFuture)).build());
                            if (channelFuture.isSuccess()) {
                                return;
                            }
                            POP_LOGGER.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), channelFuture.cause());
                        });
                        return null;
                    } catch (Throwable th) {
                        POP_LOGGER.error("Error occurred when transferring messages from page cache", th);
                        getMessageResult.release();
                        return null;
                    }
                default:
                    return createResponseCommand;
            }
        }).thenAccept(remotingCommand2 -> {
            NettyRemotingAbstract.writeResponse(channel, remotingCommand, remotingCommand2);
        });
        return null;
    }

    private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, boolean z, GetMessageResult getMessageResult, PopMessageRequestHeader popMessageRequestHeader, int i, Channel channel, long j, ExpressionMessageFilter expressionMessageFilter, StringBuilder sb, StringBuilder sb2, StringBuilder sb3, int i2, CompletableFuture<Long> completableFuture) {
        if (topicConfig != null) {
            for (int i3 = 0; i3 < topicConfig.getReadQueueNums(); i3++) {
                int readQueueNums = (i2 + i3) % topicConfig.getReadQueueNums();
                completableFuture = completableFuture.thenCompose(l -> {
                    return popMsgFromQueue(topicConfig.getTopicName(), popMessageRequestHeader.getAttemptId(), z, getMessageResult, popMessageRequestHeader, readQueueNums, l.longValue(), i, channel, j, expressionMessageFilter, sb, sb2, sb3);
                });
            }
        }
        return completableFuture;
    }

    private CompletableFuture<Long> popMsgFromTopic(String str, boolean z, GetMessageResult getMessageResult, PopMessageRequestHeader popMessageRequestHeader, int i, Channel channel, long j, ExpressionMessageFilter expressionMessageFilter, StringBuilder sb, StringBuilder sb2, StringBuilder sb3, int i2, CompletableFuture<Long> completableFuture) {
        return popMsgFromTopic(this.brokerController.getTopicConfigManager().selectTopicConfig(str), z, getMessageResult, popMessageRequestHeader, i, channel, j, expressionMessageFilter, sb, sb2, sb3, i2, completableFuture);
    }

    private CompletableFuture<Long> popMsgFromQueue(String str, String str2, boolean z, GetMessageResult getMessageResult, PopMessageRequestHeader popMessageRequestHeader, int i, long j, int i2, Channel channel, long j2, ExpressionMessageFilter expressionMessageFilter, StringBuilder sb, StringBuilder sb2, StringBuilder sb3) {
        String str3 = str + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popMessageRequestHeader.getConsumerGroup() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        boolean isOrder = popMessageRequestHeader.isOrder();
        long popOffset = getPopOffset(str, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInitMode(), false, str3, false);
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        if (!this.queueLockManager.tryLock(str3)) {
            completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - popOffset) + j));
            return completableFuture;
        }
        if (isPopShouldStop(str, popMessageRequestHeader.getConsumerGroup(), i)) {
            POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", new Object[]{str, popMessageRequestHeader.getConsumerGroup(), Integer.valueOf(i)});
            completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - popOffset) + j));
            return completableFuture;
        }
        try {
            completableFuture.whenComplete((l, th) -> {
                this.queueLockManager.unLock(str3);
            });
            long popOffset2 = getPopOffset(str, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInitMode(), true, str3, true);
            if (isOrder && this.brokerController.getConsumerOrderInfoManager().checkBlock(str2, str, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInvisibleTime())) {
                completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - popOffset2) + j));
                return completableFuture;
            }
            if (isOrder) {
                this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(str, popMessageRequestHeader.getConsumerGroup(), i);
            }
            if (getMessageResult.getMessageMapedList().size() >= popMessageRequestHeader.getMaxMsgNums()) {
                completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - popOffset2) + j));
                return completableFuture;
            }
            AtomicLong atomicLong = new AtomicLong(j);
            AtomicLong atomicLong2 = new AtomicLong(popOffset2);
            return this.brokerController.getMessageStore().getMessageAsync(popMessageRequestHeader.getConsumerGroup(), str, i, popOffset2, popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter).thenCompose(getMessageResult2 -> {
                if (getMessageResult2 == null) {
                    return CompletableFuture.completedFuture(null);
                }
                if (!GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageResult2.getStatus()) && !GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageResult2.getStatus()) && !GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageResult2.getStatus())) {
                    return CompletableFuture.completedFuture(getMessageResult2);
                }
                POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}", new Object[]{str3, Long.valueOf(atomicLong2.get()), Long.valueOf(getMessageResult2.getNextBeginOffset())});
                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), str, i, getMessageResult2.getNextBeginOffset());
                atomicLong2.set(getMessageResult2.getNextBeginOffset());
                return this.brokerController.getMessageStore().getMessageAsync(popMessageRequestHeader.getConsumerGroup(), str, i, atomicLong2.get(), popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter);
            }).thenApply(getMessageResult3 -> {
                if (getMessageResult3 == null) {
                    atomicLong.set((this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - atomicLong2.get()) + atomicLong.get());
                    return Long.valueOf(atomicLong.get());
                }
                if (!getMessageResult3.getMessageMapedList().isEmpty()) {
                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(popMessageRequestHeader.getTopic(), getMessageResult3.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetNums(popMessageRequestHeader.getConsumerGroup(), str, getMessageResult3.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetSize(popMessageRequestHeader.getConsumerGroup(), str, getMessageResult3.getBufferTotalSize());
                    Attributes build = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, popMessageRequestHeader.getTopic()).put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, popMessageRequestHeader.getConsumerGroup()).put(BrokerMetricsConstant.LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(popMessageRequestHeader.getTopic()) || MixAll.isSysConsumerGroup(popMessageRequestHeader.getConsumerGroup())).put(BrokerMetricsConstant.LABEL_IS_RETRY, z).build();
                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult3.getMessageCount(), build);
                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult3.getBufferTotalSize(), build);
                    if (isOrder) {
                        this.brokerController.getConsumerOrderInfoManager().update(popMessageRequestHeader.getAttemptId(), z, str, popMessageRequestHeader.getConsumerGroup(), i, j2, popMessageRequestHeader.getInvisibleTime(), getMessageResult3.getMessageQueueOffset(), sb3);
                        this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), str, i, popOffset2);
                    } else if (!appendCheckPoint(popMessageRequestHeader, str, i2, i, popOffset2, getMessageResult3, j2, this.brokerController.getBrokerConfig().getBrokerName())) {
                        return Long.valueOf(atomicLong.get() + getMessageResult3.getMessageCount());
                    }
                    ExtraInfoUtil.buildStartOffsetInfo(sb, str, i, popOffset2);
                    ExtraInfoUtil.buildMsgOffsetInfo(sb2, str, i, getMessageResult3.getMessageQueueOffset());
                } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(getMessageResult3.getStatus()) || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageResult3.getStatus()) || GetMessageStatus.MESSAGE_WAS_REMOVING.equals(getMessageResult3.getStatus()) || GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(getMessageResult3.getStatus())) && getMessageResult3.getNextBeginOffset() > -1) {
                    if (isOrder) {
                        this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), str, i, getMessageResult3.getNextBeginOffset());
                    } else {
                        this.popBufferMergeService.addCkMock(popMessageRequestHeader.getConsumerGroup(), str, i, popOffset2, popMessageRequestHeader.getInvisibleTime(), j2, i2, getMessageResult3.getNextBeginOffset(), this.brokerController.getBrokerConfig().getBrokerName());
                    }
                }
                atomicLong.set((getMessageResult3.getMaxOffset() - getMessageResult3.getNextBeginOffset()) + atomicLong.get());
                String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
                for (SelectMappedBufferResult selectMappedBufferResult : getMessageResult3.getMessageMapedList()) {
                    if (this.brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !z) {
                        getMessageResult.addMessage(selectMappedBufferResult);
                    } else {
                        List<MessageExt> decodesBatch = MessageDecoder.decodesBatch(selectMappedBufferResult.getByteBuffer(), true, false, true);
                        selectMappedBufferResult.release();
                        for (MessageExt messageExt : decodesBatch) {
                            try {
                                messageExt.getProperties().putIfAbsent("POP_CK", ExtraInfoUtil.buildExtraInfo(popOffset2, j2, popMessageRequestHeader.getInvisibleTime(), i2, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset()));
                                messageExt.setTopic(popMessageRequestHeader.getTopic());
                                messageExt.setStoreSize(0);
                                byte[] encode = MessageDecoder.encode(messageExt, false);
                                getMessageResult.addMessage(new SelectMappedBufferResult(selectMappedBufferResult.getStartOffset(), ByteBuffer.wrap(encode), encode.length, (MappedFile) null));
                            } catch (Exception e) {
                                POP_LOGGER.error("Exception in recode retry message buffer, topic={}", str, e);
                            }
                        }
                    }
                }
                this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(str, popMessageRequestHeader.getConsumerGroup(), i, getMessageResult3.getMessageCount());
                return Long.valueOf(atomicLong.get());
            }).whenComplete((l2, th2) -> {
                if (th2 != null) {
                    POP_LOGGER.error("Pop message error, {}", str3, th2);
                }
                this.queueLockManager.unLock(str3);
            });
        } catch (Exception e) {
            POP_LOGGER.error("Exception in popMsgFromQueue", e);
            completableFuture.complete(Long.valueOf(j));
            return completableFuture;
        }
    }

    private boolean isPopShouldStop(String str, String str2, int i) {
        return this.brokerController.getBrokerConfig().isEnablePopMessageThreshold() && this.brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(str, str2, i) > this.brokerController.getBrokerConfig().getPopInflightMessageThreshold();
    }

    private long getPopOffset(String str, String str2, int i, int i2, boolean z, String str3, boolean z2) {
        Long resetPopOffset;
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i);
        if (queryOffset < 0) {
            queryOffset = getInitOffset(str, str2, i, i2, z);
        }
        if (z2 && (resetPopOffset = resetPopOffset(str, str2, i)) != null) {
            return resetPopOffset.longValue();
        }
        long latestOffset = this.popBufferMergeService.getLatestOffset(str3);
        return latestOffset < 0 ? queryOffset : Math.max(latestOffset, queryOffset);
    }

    private long getInitOffset(String str, String str2, int i, int i2, boolean z) {
        long maxOffsetInQueue;
        if (0 == i2) {
            return this.brokerController.getMessageStore().getMinOffsetInQueue(str, i);
        }
        if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && this.brokerController.getMessageStore().getMinOffsetInQueue(str, i) <= 0 && this.brokerController.getMessageStore().checkInMemByConsumeOffset(str, i, 0L, 1)) {
            maxOffsetInQueue = 0;
        } else {
            maxOffsetInQueue = this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - 1;
            if (maxOffsetInQueue < 0) {
                maxOffsetInQueue = 0;
            }
        }
        if (z) {
            this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset", str2, str, i, maxOffsetInQueue);
        }
        return maxOffsetInQueue;
    }

    public final MessageExtBrokerInner buildCkMsg(PopCheckPoint popCheckPoint, int i) {
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(this.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(popCheckPoint).getBytes(DataConverter.CHARSET_UTF8));
        messageExtBrokerInner.setQueueId(i);
        messageExtBrokerInner.setTags("ck");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(popCheckPoint.getReviveTime() - PopAckConstants.ackTimeInterval);
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", genCkUniqueId(popCheckPoint));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        return messageExtBrokerInner;
    }

    private boolean appendCheckPoint(PopMessageRequestHeader popMessageRequestHeader, String str, int i, int i2, long j, GetMessageResult getMessageResult, long j2, String str2) {
        PopCheckPoint popCheckPoint = new PopCheckPoint();
        popCheckPoint.setBitMap(0);
        popCheckPoint.setNum((byte) getMessageResult.getMessageMapedList().size());
        popCheckPoint.setPopTime(j2);
        popCheckPoint.setInvisibleTime(popMessageRequestHeader.getInvisibleTime());
        popCheckPoint.setStartOffset(j);
        popCheckPoint.setCId(popMessageRequestHeader.getConsumerGroup());
        popCheckPoint.setTopic(str);
        popCheckPoint.setQueueId(i2);
        popCheckPoint.setBrokerName(str2);
        Iterator it = getMessageResult.getMessageQueueOffset().iterator();
        while (it.hasNext()) {
            popCheckPoint.addDiff((int) (((Long) it.next()).longValue() - j));
        }
        if (this.popBufferMergeService.addCk(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset())) {
            return true;
        }
        return this.popBufferMergeService.addCkJustOffset(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset());
    }

    private Long resetPopOffset(String str, String str2, int i) {
        String str3 = str + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + str2 + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        Long queryThenEraseResetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(str, str2, Integer.valueOf(i));
        if (queryThenEraseResetOffset != null) {
            this.brokerController.getConsumerOrderInfoManager().clearBlock(str, str2, i);
            getPopBufferMergeService().clearOffsetQueue(str3);
            this.brokerController.getConsumerOffsetManager().commitOffset("ResetPopOffset", str2, str, i, queryThenEraseResetOffset.longValue());
        }
        return queryThenEraseResetOffset;
    }

    private byte[] readGetMessageResult(GetMessageResult getMessageResult, String str, String str2, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long j = 0;
        try {
            for (ByteBuffer byteBuffer : getMessageResult.getMessageBufferList()) {
                allocate.put(byteBuffer);
                j = byteBuffer.getLong(56);
            }
            this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(str, str2, i, this.brokerController.getMessageStore().now() - j);
            return allocate.array();
        } finally {
            getMessageResult.release();
        }
    }
}
