package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.util.MsgUtil;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor.class */
public class PopMessageProcessor implements NettyRequestProcessor {
    private static final InternalLogger POP_LOGGER;
    private final BrokerController brokerController;
    String reviveTopic;
    private static final String BORN_TIME = "bornTime";
    private static final int POLLING_SUC = 0;
    private static final int POLLING_FULL = 1;
    private static final int POLLING_TIMEOUT = 2;
    private static final int NOT_POLLING = 3;
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
    private ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
    private PopBufferMergeService popBufferMergeService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Random random = new Random(System.currentTimeMillis());
    private AtomicLong totalPollingNum = new AtomicLong(0);
    private PopLongPollingService popLongPollingService = new PopLongPollingService();
    private QueueLockManager queueLockManager = new QueueLockManager();

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$PopLongPollingService.class */
    public class PopLongPollingService extends ServiceThread {
        private long lastCleanTime = 0;

        public PopLongPollingService() {
        }

        public String getServiceName() {
            return "PopLongPollingService";
        }

        private void cleanUnusedResource() {
            String[] split;
            try {
                Iterator it = PopMessageProcessor.this.topicCidMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    String str = (String) entry.getKey();
                    if (PopMessageProcessor.this.brokerController.getTopicConfigManager().selectTopicConfig(str) == null) {
                        PopMessageProcessor.POP_LOGGER.info("remove not exit topic {} in topicCidMap!", str);
                        it.remove();
                    } else {
                        Iterator it2 = ((ConcurrentHashMap) entry.getValue()).entrySet().iterator();
                        while (it2.hasNext()) {
                            String str2 = (String) ((Map.Entry) it2.next()).getKey();
                            if (!PopMessageProcessor.this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(str2)) {
                                PopMessageProcessor.POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", str2, str);
                                it2.remove();
                            }
                        }
                    }
                }
                Iterator it3 = PopMessageProcessor.this.pollingMap.entrySet().iterator();
                while (it3.hasNext()) {
                    Map.Entry entry2 = (Map.Entry) it3.next();
                    if (entry2.getKey() != null && (split = ((String) entry2.getKey()).split("@")) != null && split.length == PopMessageProcessor.NOT_POLLING) {
                        String str3 = split[PopMessageProcessor.POLLING_SUC];
                        String str4 = split[PopMessageProcessor.POLLING_FULL];
                        if (PopMessageProcessor.this.brokerController.getTopicConfigManager().selectTopicConfig(str3) == null) {
                            PopMessageProcessor.POP_LOGGER.info("remove not exit topic {} in pollingMap!", str3);
                            it3.remove();
                        } else if (!PopMessageProcessor.this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(str4)) {
                            PopMessageProcessor.POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", str4, str3);
                            it3.remove();
                        }
                    }
                }
            } catch (Throwable th) {
                PopMessageProcessor.POP_LOGGER.error("cleanUnusedResource", th);
            }
            this.lastCleanTime = System.currentTimeMillis();
        }

        public void run() {
            int i = PopMessageProcessor.POLLING_SUC;
            while (!this.stopped) {
                try {
                    waitForRunning(20L);
                    i += PopMessageProcessor.POLLING_FULL;
                    if (!PopMessageProcessor.this.pollingMap.isEmpty()) {
                        long j = 0;
                        for (Map.Entry entry : PopMessageProcessor.this.pollingMap.entrySet()) {
                            String str = (String) entry.getKey();
                            ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) entry.getValue();
                            if (concurrentSkipListSet != null) {
                                while (true) {
                                    PopRequest popRequest = (PopRequest) concurrentSkipListSet.pollFirst();
                                    if (popRequest == null) {
                                        break;
                                    }
                                    if (!popRequest.isTimeout()) {
                                        if (concurrentSkipListSet.add(popRequest)) {
                                            break;
                                        } else {
                                            PopMessageProcessor.POP_LOGGER.info("polling, add fail again: {}", popRequest);
                                        }
                                    }
                                    if (PopMessageProcessor.this.brokerController.getBrokerConfig().isEnablePopLog()) {
                                        PopMessageProcessor.POP_LOGGER.info("timeout , wakeUp polling : {}", popRequest);
                                    }
                                    PopMessageProcessor.this.totalPollingNum.decrementAndGet();
                                    PopMessageProcessor.this.wakeUp(popRequest);
                                }
                                if (i >= 100) {
                                    long size = concurrentSkipListSet.size();
                                    j += size;
                                    if (size > 100) {
                                        PopMessageProcessor.POP_LOGGER.info("polling queue {} , size={} ", str, Long.valueOf(size));
                                    }
                                }
                            }
                        }
                        if (i >= 100) {
                            PopMessageProcessor.POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}", new Object[]{Integer.valueOf(PopMessageProcessor.this.pollingMap.size()), Long.valueOf(j), Long.valueOf(PopMessageProcessor.this.totalPollingNum.get()), Long.valueOf(Math.abs(PopMessageProcessor.this.totalPollingNum.get() - j))});
                            PopMessageProcessor.this.totalPollingNum.set(j);
                            i = PopMessageProcessor.POLLING_SUC;
                        }
                        if (this.lastCleanTime == 0 || System.currentTimeMillis() - this.lastCleanTime > 300000) {
                            cleanUnusedResource();
                        }
                    }
                } catch (Throwable th) {
                    PopMessageProcessor.POP_LOGGER.error("checkPolling error", th);
                }
            }
            try {
                Iterator it = PopMessageProcessor.this.pollingMap.entrySet().iterator();
                while (it.hasNext()) {
                    ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) ((Map.Entry) it.next()).getValue();
                    while (true) {
                        PopRequest popRequest2 = (PopRequest) concurrentSkipListSet2.pollFirst();
                        if (popRequest2 != null) {
                            PopMessageProcessor.this.wakeUp(popRequest2);
                        }
                    }
                }
            } catch (Throwable th2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$QueueLockManager.class */
    public static class QueueLockManager extends ServiceThread {
        private ConcurrentHashMap<String, TimedLock> expiredLocalCache = new ConcurrentHashMap<>(100000);

        public boolean tryLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock == null) {
                if (this.expiredLocalCache.putIfAbsent(str, new TimedLock()) != null) {
                    return false;
                }
                timedLock = this.expiredLocalCache.get(str);
            }
            if (timedLock == null) {
                return false;
            }
            return timedLock.tryLock();
        }

        public int cleanUnusedLock(long j) {
            Iterator<Map.Entry<String, TimedLock>> it = this.expiredLocalCache.entrySet().iterator();
            int i = PopMessageProcessor.POLLING_SUC;
            while (it.hasNext()) {
                Map.Entry<String, TimedLock> next = it.next();
                if (System.currentTimeMillis() - next.getValue().getLockTime() > j) {
                    it.remove();
                    PopMessageProcessor.POP_LOGGER.info("Remove unused queue lock: {}, {}, {}", new Object[]{next.getKey(), Long.valueOf(next.getValue().getLockTime()), Boolean.valueOf(next.getValue().isLock())});
                }
                i += PopMessageProcessor.POLLING_FULL;
            }
            return i;
        }

        public void unLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock != null) {
                timedLock.unLock();
            }
        }

        public String getServiceName() {
            return "QueueLockManager";
        }

        public void run() {
            while (!isStopped()) {
                try {
                    waitForRunning(60000L);
                    PopMessageProcessor.POP_LOGGER.info("QueueLockSize={}", Integer.valueOf(cleanUnusedLock(60000L)));
                } catch (Exception e) {
                    PopMessageProcessor.POP_LOGGER.error("QueueLockManager run error", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$TimedLock.class */
    public static class TimedLock {
        private final AtomicBoolean lock = new AtomicBoolean(true);
        private volatile long lockTime = System.currentTimeMillis();

        public boolean tryLock() {
            if (!this.lock.compareAndSet(true, false)) {
                return false;
            }
            this.lockTime = System.currentTimeMillis();
            return true;
        }

        public void unLock() {
            this.lock.set(true);
        }

        public boolean isLock() {
            return !this.lock.get();
        }

        public long getLockTime() {
            return this.lockTime;
        }
    }

    public PopMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.reviveTopic = "rmq_sys_REVIVE_LOG_" + this.brokerController.getBrokerConfig().getBrokerClusterName();
        this.topicCidMap = new ConcurrentHashMap<>(this.brokerController.getBrokerConfig().getPopPollingMapSize());
        this.pollingMap = new ConcurrentLinkedHashMap.Builder().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
        this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this);
    }

    public PopLongPollingService getPopLongPollingService() {
        return this.popLongPollingService;
    }

    public PopBufferMergeService getPopBufferMergeService() {
        return this.popBufferMergeService;
    }

    public QueueLockManager getQueueLockManager() {
        return this.queueLockManager;
    }

    public static String genAckUniqueId(AckMsg ackMsg) {
        return ackMsg.getTopic() + "@" + ackMsg.getQueueId() + "@" + ackMsg.getAckOffset() + "@" + ackMsg.getConsumerGroup() + "@" + ackMsg.getPopTime() + "@ack";
    }

    public static String genCkUniqueId(PopCheckPoint popCheckPoint) {
        return popCheckPoint.getTopic() + "@" + ((int) popCheckPoint.getQueueId()) + "@" + popCheckPoint.getStartOffset() + "@" + popCheckPoint.getCId() + "@" + popCheckPoint.getPopTime() + "@ck";
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        remotingCommand.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        return processRequest(channelHandlerContext.channel(), remotingCommand);
    }

    public boolean rejectRequest() {
        return false;
    }

    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
        return this.pollingMap;
    }

    public void notifyMessageArriving(String str, int i) {
        ConcurrentHashMap<String, Byte> concurrentHashMap = this.topicCidMap.get(str);
        if (concurrentHashMap == null) {
            return;
        }
        for (Map.Entry<String, Byte> entry : concurrentHashMap.entrySet()) {
            if (i >= 0) {
                notifyMessageArriving(str, entry.getKey(), -1);
            }
            notifyMessageArriving(str, entry.getKey(), i);
        }
    }

    public void notifyMessageArriving(String str, String str2, int i) {
        PopRequest popRequest;
        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(KeyBuilder.buildPollingKey(str, str2, i));
        if (concurrentSkipListSet == null || concurrentSkipListSet.isEmpty()) {
            return;
        }
        Object pollFirst = concurrentSkipListSet.pollFirst();
        while (true) {
            popRequest = (PopRequest) pollFirst;
            if (popRequest == null || popRequest.getChannel().isActive()) {
                break;
            } else {
                pollFirst = concurrentSkipListSet.pollFirst();
            }
        }
        if (popRequest == null) {
            return;
        }
        this.totalPollingNum.decrementAndGet();
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest);
        }
        wakeUp(popRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeUp(final PopRequest popRequest) {
        if (popRequest == null || !popRequest.complete()) {
            return;
        }
        this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(new Runnable() { // from class: org.apache.rocketmq.broker.processor.PopMessageProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    final RemotingCommand processRequest = PopMessageProcessor.this.processRequest(popRequest.getChannel(), popRequest.getRemotingCommand());
                    if (processRequest != null) {
                        processRequest.setOpaque(popRequest.getRemotingCommand().getOpaque());
                        processRequest.markResponseType();
                        try {
                            popRequest.getChannel().writeAndFlush(processRequest).addListener(new ChannelFutureListener() { // from class: org.apache.rocketmq.broker.processor.PopMessageProcessor.1.1
                                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                    if (channelFuture.isSuccess()) {
                                        return;
                                    }
                                    PopMessageProcessor.POP_LOGGER.error("ProcessRequestWrapper response to {} failed", channelFuture.channel().remoteAddress(), channelFuture.cause());
                                    PopMessageProcessor.POP_LOGGER.error(popRequest.toString());
                                    PopMessageProcessor.POP_LOGGER.error(processRequest.toString());
                                }
                            });
                        } catch (Throwable th) {
                            PopMessageProcessor.POP_LOGGER.error("ProcessRequestWrapper process request over, but response failed", th);
                            PopMessageProcessor.POP_LOGGER.error(popRequest.toString());
                            PopMessageProcessor.POP_LOGGER.error(processRequest.toString());
                        }
                    }
                } catch (RemotingCommandException e) {
                    PopMessageProcessor.POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
                }
            }
        }, popRequest.getChannel(), popRequest.getRemotingCommand()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RemotingCommand processRequest(final Channel channel, RemotingCommand remotingCommand) throws RemotingCommandException {
        TopicConfig selectTopicConfig;
        TopicConfig selectTopicConfig2;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
        PopMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PopMessageRequestHeader popMessageRequestHeader = (PopMessageRequestHeader) remotingCommand.decodeCommandCustomHeader(PopMessageRequestHeader.class);
        StringBuilder sb = new StringBuilder(64);
        StringBuilder sb2 = new StringBuilder(64);
        StringBuilder sb3 = POLLING_SUC;
        if (popMessageRequestHeader.isOrder()) {
            sb3 = new StringBuilder(64);
        }
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("receive PopMessage request command, {}", remotingCommand);
        }
        if (popMessageRequestHeader.isTimeoutTooMuch()) {
            createResponseCommand.setCode(POLLING_TIMEOUT);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message is timeout too much", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (popMessageRequestHeader.getMaxMsgNums() > 32) {
            createResponseCommand.setCode(POLLING_FULL);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message's num is greater than 32", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        TopicConfig selectTopicConfig3 = this.brokerController.getTopicConfigManager().selectTopicConfig(popMessageRequestHeader.getTopic());
        if (POLLING_SUC == selectTopicConfig3) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", popMessageRequestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", popMessageRequestHeader.getTopic(), FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig3.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + popMessageRequestHeader.getTopic() + "] peeking message is forbidden");
            return createResponseCommand;
        }
        if (popMessageRequestHeader.getQueueId() >= selectTopicConfig3.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", Integer.valueOf(popMessageRequestHeader.getQueueId()), popMessageRequestHeader.getTopic(), Integer.valueOf(selectTopicConfig3.getReadQueueNums()), channel.remoteAddress());
            POP_LOGGER.warn(format);
            createResponseCommand.setCode(POLLING_FULL);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popMessageRequestHeader.getConsumerGroup());
        if (POLLING_SUC == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", popMessageRequestHeader.getConsumerGroup(), FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/")));
            return createResponseCommand;
        }
        if (POLLING_SUC == this.brokerController.getConsumerManager().getConsumerGroupInfo(popMessageRequestHeader.getConsumerGroup())) {
            POP_LOGGER.warn("the consumer's group info not exist, group: {}", popMessageRequestHeader.getConsumerGroup());
            createResponseCommand.setCode(24);
            createResponseCommand.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + popMessageRequestHeader.getConsumerGroup());
            return createResponseCommand;
        }
        ExpressionMessageFilter expressionMessageFilter = POLLING_SUC;
        if (popMessageRequestHeader.getExp() != null && popMessageRequestHeader.getExp().length() > 0) {
            try {
                SubscriptionData build = FilterAPI.build(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getExp(), popMessageRequestHeader.getExpType());
                ConsumerFilterData consumerFilterData = POLLING_SUC;
                if (!ExpressionType.isTagType(build.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getExp(), popMessageRequestHeader.getExpType(), System.currentTimeMillis());
                    if (consumerFilterData == null) {
                        POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", popMessageRequestHeader.getExp(), popMessageRequestHeader.getConsumerGroup());
                        createResponseCommand.setCode(23);
                        createResponseCommand.setRemark("parse the consumer's subscription failed");
                        return createResponseCommand;
                    }
                }
                expressionMessageFilter = new ExpressionMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager());
            } catch (Exception e) {
                POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", popMessageRequestHeader.getExp(), popMessageRequestHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        }
        int nextInt = this.random.nextInt(100);
        int reviveQueueNum = popMessageRequestHeader.isOrder() ? 999 : nextInt % this.brokerController.getBrokerConfig().getReviveQueueNum();
        final GetMessageResult getMessageResult = new GetMessageResult();
        long j = 0;
        boolean z = nextInt % 5 == 0;
        long currentTimeMillis = System.currentTimeMillis();
        if (z && !popMessageRequestHeader.isOrder() && (selectTopicConfig2 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()))) != null) {
            for (int i = POLLING_SUC; i < selectTopicConfig2.getReadQueueNums(); i += POLLING_FULL) {
                j = popMsgFromQueue(true, getMessageResult, popMessageRequestHeader, (nextInt + i) % selectTopicConfig2.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis, expressionMessageFilter, sb, sb2, sb3);
            }
        }
        if (popMessageRequestHeader.getQueueId() < 0) {
            for (int i2 = POLLING_SUC; i2 < selectTopicConfig3.getReadQueueNums(); i2 += POLLING_FULL) {
                j = popMsgFromQueue(false, getMessageResult, popMessageRequestHeader, (nextInt + i2) % selectTopicConfig3.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis, expressionMessageFilter, sb, sb2, sb3);
            }
        } else {
            j = popMsgFromQueue(false, getMessageResult, popMessageRequestHeader, popMessageRequestHeader.getQueueId(), j, reviveQueueNum, channel, currentTimeMillis, expressionMessageFilter, sb, sb2, sb3);
        }
        if (!z && getMessageResult.getMessageMapedList().size() < popMessageRequestHeader.getMaxMsgNums() && !popMessageRequestHeader.isOrder() && (selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()))) != null) {
            for (int i3 = POLLING_SUC; i3 < selectTopicConfig.getReadQueueNums(); i3 += POLLING_FULL) {
                j = popMsgFromQueue(true, getMessageResult, popMessageRequestHeader, (nextInt + i3) % selectTopicConfig.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis, expressionMessageFilter, sb, sb2, sb3);
            }
        }
        if (getMessageResult.getMessageBufferList().isEmpty()) {
            int polling = polling(channel, remotingCommand, popMessageRequestHeader);
            if (POLLING_SUC == polling) {
                return null;
            }
            if (POLLING_FULL == polling) {
                createResponseCommand.setCode(209);
            } else {
                createResponseCommand.setCode(210);
            }
            getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
        } else {
            createResponseCommand.setCode(POLLING_SUC);
            getMessageResult.setStatus(GetMessageStatus.FOUND);
            if (j > 0) {
                notifyMessageArriving(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getQueueId());
            }
        }
        readCustomHeader.setInvisibleTime(popMessageRequestHeader.getInvisibleTime());
        readCustomHeader.setPopTime(currentTimeMillis);
        readCustomHeader.setReviveQid(reviveQueueNum);
        readCustomHeader.setRestNum(j);
        readCustomHeader.setStartOffsetInfo(sb.toString());
        readCustomHeader.setMsgOffsetInfo(sb2.toString());
        if (popMessageRequestHeader.isOrder() && sb3 != null) {
            readCustomHeader.setOrderCountInfo(sb3.toString());
        }
        createResponseCommand.setRemark(getMessageResult.getStatus().name());
        switch (createResponseCommand.getCode()) {
            case POLLING_SUC /* 0 */:
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    long now = this.brokerController.getMessageStore().now();
                    byte[] readGetMessageResult = readGetMessageResult(getMessageResult, popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), popMessageRequestHeader.getQueueId());
                    this.brokerController.getBrokerStatsManager().incGroupGetLatency(popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getTopic(), popMessageRequestHeader.getQueueId(), (int) (this.brokerController.getMessageStore().now() - now));
                    createResponseCommand.setBody(readGetMessageResult);
                    break;
                } else {
                    try {
                        channel.writeAndFlush(new ManyMessageTransfer(createResponseCommand.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult)).addListener(new ChannelFutureListener() { // from class: org.apache.rocketmq.broker.processor.PopMessageProcessor.2
                            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                getMessageResult.release();
                                if (channelFuture.isSuccess()) {
                                    return;
                                }
                                PopMessageProcessor.POP_LOGGER.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), channelFuture.cause());
                            }
                        });
                    } catch (Throwable th) {
                        POP_LOGGER.error("Error occurred when transferring messages from page cache", th);
                        getMessageResult.release();
                    }
                    createResponseCommand = POLLING_SUC;
                    break;
                }
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                break;
        }
        return createResponseCommand;
    }

    private long popMsgFromQueue(boolean z, GetMessageResult getMessageResult, PopMessageRequestHeader popMessageRequestHeader, int i, long j, int i2, Channel channel, long j2, ExpressionMessageFilter expressionMessageFilter, StringBuilder sb, StringBuilder sb2, StringBuilder sb3) {
        String buildPopRetryTopic = z ? KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()) : popMessageRequestHeader.getTopic();
        String str = buildPopRetryTopic + "@" + popMessageRequestHeader.getConsumerGroup() + "@" + i;
        boolean isOrder = popMessageRequestHeader.isOrder();
        long popOffset = getPopOffset(buildPopRetryTopic, popMessageRequestHeader, i, false, str);
        if (!this.queueLockManager.tryLock(str)) {
            return (this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset) + j;
        }
        long popOffset2 = getPopOffset(buildPopRetryTopic, popMessageRequestHeader, i, true, str);
        if (isOrder) {
            try {
                if (this.brokerController.getConsumerOrderInfoManager().checkBlock(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInvisibleTime())) {
                    long maxOffsetInQueue = (this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset2) + j;
                    this.queueLockManager.unLock(str);
                    return maxOffsetInQueue;
                }
            } finally {
                this.queueLockManager.unLock(str);
            }
        }
        if (getMessageResult.getMessageMapedList().size() >= popMessageRequestHeader.getMaxMsgNums()) {
            long maxOffsetInQueue2 = (this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset2) + j;
            this.queueLockManager.unLock(str);
            return maxOffsetInQueue2;
        }
        GetMessageResult message = this.brokerController.getMessageStore().getMessage(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2, popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter);
        if (message == null || GetMessageStatus.OFFSET_TOO_SMALL.equals(message.getStatus()) || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(message.getStatus()) || GetMessageStatus.OFFSET_FOUND_NULL.equals(message.getStatus())) {
            InternalLogger internalLogger = POP_LOGGER;
            Object[] objArr = new Object[NOT_POLLING];
            objArr[POLLING_SUC] = str;
            objArr[POLLING_FULL] = Long.valueOf(popOffset2);
            objArr[POLLING_TIMEOUT] = message != null ? Long.valueOf(message.getNextBeginOffset()) : "null";
            internalLogger.warn("Pop initial offset, because store is no correct, {}, {}->{}", objArr);
            popOffset2 = message != null ? message.getNextBeginOffset() : 0L;
            this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2);
            message = this.brokerController.getMessageStore().getMessage(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2, popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter);
        }
        long maxOffset = (message.getMaxOffset() - message.getNextBeginOffset()) + j;
        if (!message.getMessageMapedList().isEmpty()) {
            this.brokerController.getBrokerStatsManager().incBrokerGetNums(message.getMessageCount());
            this.brokerController.getBrokerStatsManager().incGroupGetNums(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, message.getMessageCount());
            this.brokerController.getBrokerStatsManager().incGroupGetSize(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, message.getBufferTotalSize());
            if (isOrder) {
                int update = this.brokerController.getConsumerOrderInfoManager().update(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, message.getMessageQueueOffset());
                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2);
                ExtraInfoUtil.buildOrderCountInfo(sb3, z, i, update);
            } else {
                appendCheckPoint(popMessageRequestHeader, buildPopRetryTopic, i2, i, popOffset2, message, j2);
            }
            ExtraInfoUtil.buildStartOffsetInfo(sb, z, i, popOffset2);
            ExtraInfoUtil.buildMsgOffsetInfo(sb2, z, i, message.getMessageQueueOffset());
        } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(message.getStatus()) || GetMessageStatus.OFFSET_FOUND_NULL.equals(message.getStatus()) || GetMessageStatus.MESSAGE_WAS_REMOVING.equals(message.getStatus()) || GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(message.getStatus())) && message.getNextBeginOffset() > -1) {
            this.popBufferMergeService.addCkMock(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2, popMessageRequestHeader.getInvisibleTime(), j2, i2, message.getNextBeginOffset());
        }
        if (message != null) {
            Iterator it = message.getMessageMapedList().iterator();
            while (it.hasNext()) {
                getMessageResult.addMessage((SelectMappedBufferResult) it.next());
            }
        }
        return maxOffset;
    }

    private long getPopOffset(String str, PopMessageRequestHeader popMessageRequestHeader, int i, boolean z, String str2) {
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(popMessageRequestHeader.getConsumerGroup(), str, i);
        if (queryOffset < 0) {
            if (POLLING_SUC == popMessageRequestHeader.getInitMode()) {
                queryOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(str, i);
            } else {
                queryOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - 1;
                if (queryOffset < 0) {
                    queryOffset = 0;
                }
                if (z) {
                    this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset", popMessageRequestHeader.getConsumerGroup(), str, i, queryOffset);
                }
            }
        }
        long latestOffset = this.popBufferMergeService.getLatestOffset(str2);
        if (latestOffset >= 0 && latestOffset > queryOffset) {
            return latestOffset;
        }
        return queryOffset;
    }

    private int polling(Channel channel, RemotingCommand remotingCommand, PopMessageRequestHeader popMessageRequestHeader) {
        if (popMessageRequestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
            return NOT_POLLING;
        }
        ConcurrentHashMap<String, Byte> concurrentHashMap = this.topicCidMap.get(popMessageRequestHeader.getTopic());
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            ConcurrentHashMap<String, Byte> putIfAbsent = this.topicCidMap.putIfAbsent(popMessageRequestHeader.getTopic(), concurrentHashMap);
            if (putIfAbsent != null) {
                concurrentHashMap = putIfAbsent;
            }
        }
        concurrentHashMap.putIfAbsent(popMessageRequestHeader.getConsumerGroup(), Byte.MIN_VALUE);
        PopRequest popRequest = new PopRequest(remotingCommand, channel, popMessageRequestHeader.getBornTime() + popMessageRequestHeader.getPollTime());
        if (this.totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize()) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, Long.valueOf(this.totalPollingNum.get()));
            return POLLING_FULL;
        }
        if (popRequest.isTimeout()) {
            if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                return POLLING_TIMEOUT;
            }
            POP_LOGGER.info("polling {}, result POLLING_TIMEOUT", remotingCommand);
            return POLLING_TIMEOUT;
        }
        String buildPollingKey = KeyBuilder.buildPollingKey(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getQueueId());
        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(buildPollingKey);
        if (concurrentSkipListSet == null) {
            concurrentSkipListSet = new ConcurrentSkipListSet(PopRequest.COMPARATOR);
            ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) this.pollingMap.putIfAbsent(buildPollingKey, concurrentSkipListSet);
            if (concurrentSkipListSet2 != null) {
                concurrentSkipListSet = concurrentSkipListSet2;
            }
        } else {
            int size = concurrentSkipListSet.size();
            if (size > this.brokerController.getBrokerConfig().getPopPollingSize()) {
                POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, Integer.valueOf(size));
                return POLLING_FULL;
            }
        }
        if (!concurrentSkipListSet.add(popRequest)) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, add fail, {}", popRequest, concurrentSkipListSet);
            return POLLING_FULL;
        }
        this.totalPollingNum.incrementAndGet();
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return POLLING_SUC;
        }
        POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
        return POLLING_SUC;
    }

    public final MessageExtBrokerInner buildCkMsg(PopCheckPoint popCheckPoint, int i) {
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(this.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(popCheckPoint).getBytes(DataConverter.charset));
        messageExtBrokerInner.setQueueId(i);
        messageExtBrokerInner.setTags("ck");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        MsgUtil.setMessageDeliverTime(this.brokerController, messageExtBrokerInner, popCheckPoint.getReviveTime() - PopAckConstants.ackTimeInterval);
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", genCkUniqueId(popCheckPoint));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        return messageExtBrokerInner;
    }

    private void appendCheckPoint(PopMessageRequestHeader popMessageRequestHeader, String str, int i, int i2, long j, GetMessageResult getMessageResult, long j2) {
        PopCheckPoint popCheckPoint = new PopCheckPoint();
        popCheckPoint.setBitMap(POLLING_SUC);
        popCheckPoint.setNum((byte) getMessageResult.getMessageMapedList().size());
        popCheckPoint.setPopTime(j2);
        popCheckPoint.setInvisibleTime(popMessageRequestHeader.getInvisibleTime());
        popCheckPoint.getStartOffset(j);
        popCheckPoint.setCId(popMessageRequestHeader.getConsumerGroup());
        popCheckPoint.setTopic(str);
        popCheckPoint.setQueueId((byte) i2);
        Iterator it = getMessageResult.getMessageQueueOffset().iterator();
        while (it.hasNext()) {
            popCheckPoint.addDiff((int) (((Long) it.next()).longValue() - j));
        }
        if (this.popBufferMergeService.addCk(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset())) {
            return;
        }
        this.popBufferMergeService.addCkJustOffset(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset());
    }

    private byte[] readGetMessageResult(GetMessageResult getMessageResult, String str, String str2, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long j = 0;
        try {
            for (ByteBuffer byteBuffer : getMessageResult.getMessageBufferList()) {
                allocate.put(byteBuffer);
                j = byteBuffer.getLong(56);
            }
            this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(str, str2, i, this.brokerController.getMessageStore().now() - j);
            return allocate.array();
        } finally {
            getMessageResult.release();
        }
    }

    static {
        $assertionsDisabled = !PopMessageProcessor.class.desiredAssertionStatus();
        POP_LOGGER = InternalLoggerFactory.getLogger("RocketmqPop");
    }
}
