package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsConstant;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.SelectMappedBufferResult;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PeekMessageProcessor.class */
public class PeekMessageProcessor implements NettyRequestProcessor {
    private static final Logger LOG;
    private final BrokerController brokerController;
    private Random random = new Random(System.currentTimeMillis());
    static final /* synthetic */ boolean $assertionsDisabled;

    public PeekMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        return processRequest(channelHandlerContext.channel(), remotingCommand, true);
    }

    public boolean rejectRequest() {
        return false;
    }

    private RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand, boolean z) throws RemotingCommandException {
        TopicConfig selectTopicConfig;
        TopicConfig selectTopicConfig2;
        long now = this.brokerController.getMessageStore().now();
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
        PopMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PeekMessageRequestHeader peekMessageRequestHeader = (PeekMessageRequestHeader) remotingCommand.decodeCommandCustomHeader(PeekMessageRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] peeking message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        TopicConfig selectTopicConfig3 = this.brokerController.getTopicConfigManager().selectTopicConfig(peekMessageRequestHeader.getTopic());
        if (null == selectTopicConfig3) {
            LOG.error("The topic {} not exist, consumer: {} ", peekMessageRequestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", peekMessageRequestHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig3.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + peekMessageRequestHeader.getTopic() + "] peeking message is forbidden");
            return createResponseCommand;
        }
        if (peekMessageRequestHeader.getQueueId().intValue() >= selectTopicConfig3.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", peekMessageRequestHeader.getQueueId(), peekMessageRequestHeader.getTopic(), Integer.valueOf(selectTopicConfig3.getReadQueueNums()), channel.remoteAddress());
            LOG.warn(format);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(peekMessageRequestHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", peekMessageRequestHeader.getConsumerGroup(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + peekMessageRequestHeader.getConsumerGroup());
            return createResponseCommand;
        }
        int nextInt = this.random.nextInt(100);
        int reviveQueueNum = nextInt % this.brokerController.getBrokerConfig().getReviveQueueNum();
        GetMessageResult getMessageResult = new GetMessageResult(peekMessageRequestHeader.getMaxMsgNums());
        boolean z2 = nextInt % 5 == 0;
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        BrokerConfig brokerConfig = this.brokerController.getBrokerConfig();
        if (z2 && (selectTopicConfig2 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(peekMessageRequestHeader.getTopic(), peekMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()))) != null) {
            for (int i = 0; i < selectTopicConfig2.getReadQueueNums(); i++) {
                j = peekMsgFromQueue(true, getMessageResult, peekMessageRequestHeader, (nextInt + i) % selectTopicConfig2.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis);
            }
        }
        if (peekMessageRequestHeader.getQueueId().intValue() < 0) {
            for (int i2 = 0; i2 < selectTopicConfig3.getReadQueueNums(); i2++) {
                j = peekMsgFromQueue(false, getMessageResult, peekMessageRequestHeader, (nextInt + i2) % selectTopicConfig3.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis);
            }
        } else {
            j = peekMsgFromQueue(false, getMessageResult, peekMessageRequestHeader, peekMessageRequestHeader.getQueueId().intValue(), j, reviveQueueNum, channel, currentTimeMillis);
        }
        if (!z2 && getMessageResult.getMessageMapedList().size() < peekMessageRequestHeader.getMaxMsgNums() && (selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(peekMessageRequestHeader.getTopic(), peekMessageRequestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()))) != null) {
            for (int i3 = 0; i3 < selectTopicConfig.getReadQueueNums(); i3++) {
                j = peekMsgFromQueue(true, getMessageResult, peekMessageRequestHeader, (nextInt + i3) % selectTopicConfig.getReadQueueNums(), j, reviveQueueNum, channel, currentTimeMillis);
            }
        }
        if (getMessageResult.getMessageBufferList().isEmpty()) {
            createResponseCommand.setCode(19);
            getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
        } else {
            createResponseCommand.setCode(0);
            getMessageResult.setStatus(GetMessageStatus.FOUND);
        }
        readCustomHeader.setRestNum(j);
        createResponseCommand.setRemark(getMessageResult.getStatus().name());
        switch (createResponseCommand.getCode()) {
            case 0:
                this.brokerController.getBrokerStatsManager().incGroupGetNums(peekMessageRequestHeader.getConsumerGroup(), peekMessageRequestHeader.getTopic(), getMessageResult.getMessageCount());
                this.brokerController.getBrokerStatsManager().incGroupGetSize(peekMessageRequestHeader.getConsumerGroup(), peekMessageRequestHeader.getTopic(), getMessageResult.getBufferTotalSize());
                this.brokerController.getBrokerStatsManager().incBrokerGetNums(peekMessageRequestHeader.getTopic(), getMessageResult.getMessageCount());
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    byte[] readGetMessageResult = readGetMessageResult(getMessageResult, peekMessageRequestHeader.getConsumerGroup(), peekMessageRequestHeader.getTopic(), peekMessageRequestHeader.getQueueId().intValue());
                    this.brokerController.getBrokerStatsManager().incGroupGetLatency(peekMessageRequestHeader.getConsumerGroup(), peekMessageRequestHeader.getTopic(), peekMessageRequestHeader.getQueueId().intValue(), (int) (this.brokerController.getMessageStore().now() - now));
                    createResponseCommand.setBody(readGetMessageResult);
                    break;
                } else {
                    try {
                        channel.writeAndFlush(new ManyMessageTransfer(createResponseCommand.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult)).addListener(channelFuture -> {
                            getMessageResult.release();
                            RemotingMetricsManager.rpcLatency.record(remotingCommand.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), RemotingMetricsManager.newAttributesBuilder().put("request_code", RemotingHelper.getRequestCodeDesc(remotingCommand.getCode())).put("response_code", RemotingHelper.getResponseCodeDesc(createResponseCommand.getCode())).put("result", RemotingMetricsManager.getWriteAndFlushResult(channelFuture)).build());
                            if (channelFuture.isSuccess()) {
                                return;
                            }
                            LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), channelFuture.cause());
                        });
                    } catch (Throwable th) {
                        LOG.error("Error occurred when transferring messages from page cache", th);
                        getMessageResult.release();
                    }
                    createResponseCommand = null;
                    break;
                }
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                break;
        }
        return createResponseCommand;
    }

    private long peekMsgFromQueue(boolean z, GetMessageResult getMessageResult, PeekMessageRequestHeader peekMessageRequestHeader, int i, long j, int i2, Channel channel, long j2) {
        String buildPopRetryTopic = z ? KeyBuilder.buildPopRetryTopic(peekMessageRequestHeader.getTopic(), peekMessageRequestHeader.getConsumerGroup(), this.brokerController.getBrokerConfig().isEnableRetryTopicV2()) : peekMessageRequestHeader.getTopic();
        long popOffset = getPopOffset(buildPopRetryTopic, peekMessageRequestHeader.getConsumerGroup(), i);
        long maxOffsetInQueue = (this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset) + j;
        if (getMessageResult.getMessageMapedList().size() >= peekMessageRequestHeader.getMaxMsgNums()) {
            return maxOffsetInQueue;
        }
        GetMessageResult message = this.brokerController.getMessageStore().getMessage(peekMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset, peekMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), (MessageFilter) null);
        if (GetMessageStatus.OFFSET_TOO_SMALL.equals(message.getStatus()) || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(message.getStatus())) {
            message = this.brokerController.getMessageStore().getMessage(peekMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, message.getNextBeginOffset(), peekMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), (MessageFilter) null);
        }
        if (message != null) {
            if (!message.getMessageMapedList().isEmpty() && !z) {
                Attributes build = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, peekMessageRequestHeader.getTopic()).put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, peekMessageRequestHeader.getConsumerGroup()).put(BrokerMetricsConstant.LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(peekMessageRequestHeader.getTopic()) || MixAll.isSysConsumerGroup(peekMessageRequestHeader.getConsumerGroup())).build();
                BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), build);
                BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), build);
            }
            Iterator it = message.getMessageMapedList().iterator();
            while (it.hasNext()) {
                getMessageResult.addMessage((SelectMappedBufferResult) it.next());
            }
        }
        return maxOffsetInQueue;
    }

    private long getPopOffset(String str, String str2, int i) {
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i);
        if (queryOffset < 0) {
            queryOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(str, i);
        }
        long latestOffset = this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(str, str2, i);
        if (latestOffset >= 0 && latestOffset > queryOffset) {
            return latestOffset;
        }
        return queryOffset;
    }

    private byte[] readGetMessageResult(GetMessageResult getMessageResult, String str, String str2, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long j = 0;
        try {
            for (ByteBuffer byteBuffer : getMessageResult.getMessageBufferList()) {
                allocate.put(byteBuffer);
                j = byteBuffer.getLong(56);
            }
            this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(str, str2, i, this.brokerController.getMessageStore().now() - j);
            return allocate.array();
        } finally {
            getMessageResult.release();
        }
    }

    static {
        $assertionsDisabled = !PeekMessageProcessor.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger("RocketmqBroker");
    }
}
