/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.processor;

import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

public class PullMessageProcessor
extends AsyncNettyRequestProcessor
implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerController brokerController;
    private List<ConsumeMessageHook> consumeMessageHookList;

    public PullMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        return this.processRequest(ctx.channel(), request, true);
    }

    public boolean rejectRequest() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
        int queueId;
        String topic;
        boolean hasCommitOffsetFlag;
        PullMessageRequestHeader requestHeader;
        RemotingCommand response;
        block76: {
            block83: {
                void var18_22;
                GetMessageResult getMessageResult;
                LogicalQueueRouteData queueRouteData;
                LogicalQueuesInfoInBroker logicalQueuesInfo;
                int maxMsgNums;
                long offset;
                SubscriptionData subscriptionData;
                long suspendTimeoutMillisLong;
                boolean hasSuspendFlag;
                SubscriptionGroupConfig subscriptionGroupConfig;
                PullMessageResponseHeader responseHeader;
                block77: {
                    int responseErrorCode;
                    block80: {
                        block78: {
                            block82: {
                                block81: {
                                    block79: {
                                        ConsumerFilterData consumerFilterData;
                                        block74: {
                                            response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
                                            responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
                                            requestHeader = (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
                                            response.setOpaque(request.getOpaque());
                                            log.debug("receive PullMessage request command, {}", (Object)request);
                                            if (!PermName.isReadable((int)this.brokerController.getBrokerConfig().getBrokerPermission())) {
                                                response.setCode(16);
                                                response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
                                                return response;
                                            }
                                            subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
                                            if (null == subscriptionGroupConfig) {
                                                response.setCode(26);
                                                response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/")));
                                                return response;
                                            }
                                            if (!subscriptionGroupConfig.isConsumeEnable()) {
                                                response.setCode(16);
                                                response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
                                                return response;
                                            }
                                            hasSuspendFlag = PullSysFlag.hasSuspendFlag((int)requestHeader.getSysFlag());
                                            hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag((int)requestHeader.getSysFlag());
                                            boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag((int)requestHeader.getSysFlag());
                                            suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0L;
                                            topic = requestHeader.getTopic();
                                            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
                                            if (null == topicConfig) {
                                                log.error("the topic {} not exist, consumer: {}", (Object)topic, (Object)RemotingHelper.parseChannelRemoteAddr((Channel)channel));
                                                response.setCode(17);
                                                response.setRemark(String.format("topic[%s] not exist, apply first please! %s", topic, FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/")));
                                                return response;
                                            }
                                            if (!PermName.isReadable((int)topicConfig.getPerm())) {
                                                response.setCode(16);
                                                response.setRemark("the topic[" + topic + "] pulling message is forbidden");
                                                return response;
                                            }
                                            queueId = requestHeader.getQueueId();
                                            if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
                                                String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", queueId, topic, topicConfig.getReadQueueNums(), channel.remoteAddress());
                                                log.warn(errorInfo);
                                                response.setCode(1);
                                                response.setRemark(errorInfo);
                                                return response;
                                            }
                                            subscriptionData = null;
                                            consumerFilterData = null;
                                            if (hasSubscriptionFlag) {
                                                try {
                                                    subscriptionData = FilterAPI.build((String)topic, (String)requestHeader.getSubscription(), (String)requestHeader.getExpressionType());
                                                    if (!ExpressionType.isTagType((String)subscriptionData.getExpressionType())) {
                                                        consumerFilterData = ConsumerFilterManager.build(topic, requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
                                                        assert (consumerFilterData != null);
                                                    }
                                                    break block74;
                                                }
                                                catch (Exception exception) {
                                                    log.warn("Parse the consumer's subscription[{}] failed, group: {}", (Object)requestHeader.getSubscription(), (Object)requestHeader.getConsumerGroup());
                                                    response.setCode(23);
                                                    response.setRemark("parse the consumer's subscription failed");
                                                    return response;
                                                }
                                            }
                                            ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
                                            if (null == consumerGroupInfo) {
                                                log.warn("the consumer's group info not exist, group: {}", (Object)requestHeader.getConsumerGroup());
                                                response.setCode(24);
                                                response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"));
                                                return response;
                                            }
                                            if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                                                response.setCode(16);
                                                response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                                                return response;
                                            }
                                            subscriptionData = consumerGroupInfo.findSubscriptionData(topic);
                                            if (null == subscriptionData) {
                                                log.warn("the consumer's subscription not exist, group: {}, topic:{}", (Object)requestHeader.getConsumerGroup(), (Object)topic);
                                                response.setCode(24);
                                                response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"));
                                                return response;
                                            }
                                            if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                                                log.warn("The broker's subscription is not latest, group: {} {}", (Object)requestHeader.getConsumerGroup(), (Object)subscriptionData.getSubString());
                                                response.setCode(25);
                                                response.setRemark("the consumer's subscription not latest");
                                                return response;
                                            }
                                            if (!ExpressionType.isTagType((String)subscriptionData.getExpressionType())) {
                                                consumerFilterData = this.brokerController.getConsumerFilterManager().get(topic, requestHeader.getConsumerGroup());
                                                if (consumerFilterData == null) {
                                                    response.setCode(27);
                                                    response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                                                    return response;
                                                }
                                                if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
                                                    log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", new Object[]{requestHeader.getConsumerGroup(), topic, consumerFilterData.getClientVersion(), requestHeader.getSubVersion()});
                                                    response.setCode(28);
                                                    response.setRemark("the consumer's consumer filter data not latest");
                                                    return response;
                                                }
                                            }
                                        }
                                        if (!ExpressionType.isTagType((String)subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
                                            response.setCode(1);
                                            response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
                                            return response;
                                        }
                                        if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
                                            ExpressionForRetryMessageFilter expressionForRetryMessageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
                                        } else {
                                            ExpressionMessageFilter expressionMessageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
                                        }
                                        offset = requestHeader.getQueueOffset();
                                        maxMsgNums = requestHeader.getMaxMsgNums();
                                        logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
                                        queueRouteData = null;
                                        if (logicalQueuesInfo == null) break block77;
                                        responseErrorCode = 0;
                                        queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, offset);
                                        if (queueRouteData == null) break block78;
                                        if (!queueRouteData.isWriteOnly()) break block79;
                                        responseErrorCode = 19;
                                        response.setRemark("logical queue write only");
                                        break block80;
                                    }
                                    if (!queueRouteData.isExpired()) break block81;
                                    responseErrorCode = 20;
                                    response.setRemark("logical queue expired");
                                    this.prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
                                    break block80;
                                }
                                if (!MessageQueueRouteState.ReadOnly.equals((Object)queueRouteData.getState()) || queueRouteData.getOffsetMax() < 0L) break block80;
                                if (offset < queueRouteData.getOffsetMax()) break block82;
                                responseErrorCode = 20;
                                response.setRemark("queue offset exceed offsetMax");
                                this.prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
                                break block80;
                            }
                            if (offset + (long)maxMsgNums <= queueRouteData.getOffsetMax()) break block80;
                            if (queueRouteData.getOffsetMax() - 1L <= this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) && this.brokerController.getMessageStore().getCommitLogOffsetInQueue(topic, queueId, queueRouteData.getOffsetMax() - 1L) < this.brokerController.getMessageStore().getMinPhyOffset()) {
                                responseErrorCode = 20;
                                response.setRemark("queue offset removed");
                                this.prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
                                break block80;
                            } else {
                                maxMsgNums = (int)(queueRouteData.getOffsetMax() - offset);
                                if (maxMsgNums <= 0) {
                                    responseErrorCode = 20;
                                    response.setRemark("queue offset out of range");
                                    this.prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
                                }
                            }
                            break block80;
                        }
                        responseErrorCode = 20;
                        response.setRemark("no suitable queue");
                        response.addExtField("REDIRECT", "1");
                        response.setBody(null);
                        queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, 0L);
                    }
                    if (responseErrorCode != 0) {
                        response.setCode(responseErrorCode);
                        responseHeader.setMinOffset(Long.valueOf(offset));
                        responseHeader.setMaxOffset(Long.valueOf(queueRouteData != null ? queueRouteData.getOffsetMax() : offset));
                        responseHeader.setNextBeginOffset(Long.valueOf(queueRouteData != null ? queueRouteData.getOffsetMax() : offset));
                        responseHeader.setSuggestWhichBrokerId(Long.valueOf(0L));
                        return response;
                    }
                }
                if ((getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset, maxMsgNums, (MessageFilter)var18_22)) == null) break block83;
                response.setRemark(getMessageResult.getStatus().name());
                long nextBeginOffset = getMessageResult.getNextBeginOffset();
                if (queueRouteData != null && queueRouteData.getOffsetMax() >= 0L && nextBeginOffset > queueRouteData.getOffsetMax()) {
                    nextBeginOffset = queueRouteData.getOffsetMax();
                }
                responseHeader.setNextBeginOffset(Long.valueOf(nextBeginOffset));
                responseHeader.setMinOffset(Long.valueOf(getMessageResult.getMinOffset()));
                responseHeader.setMaxOffset(Long.valueOf(getMessageResult.getMaxOffset()));
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
                } else {
                    responseHeader.setSuggestWhichBrokerId(Long.valueOf(0L));
                }
                switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                    case ASYNC_MASTER: 
                    case SYNC_MASTER: {
                        break;
                    }
                    case SLAVE: {
                        if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) break;
                        response.setCode(20);
                        responseHeader.setSuggestWhichBrokerId(Long.valueOf(0L));
                        break;
                    }
                }
                if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                    if (getMessageResult.isSuggestPullingFromSlave()) {
                        responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
                    } else {
                        responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                    }
                } else {
                    responseHeader.setSuggestWhichBrokerId(Long.valueOf(0L));
                }
                switch (getMessageResult.getStatus()) {
                    case FOUND: {
                        response.setCode(0);
                        break;
                    }
                    case MESSAGE_WAS_REMOVING: {
                        response.setCode(20);
                        break;
                    }
                    case NO_MATCHED_LOGIC_QUEUE: 
                    case NO_MESSAGE_IN_QUEUE: {
                        if (0L != requestHeader.getQueueOffset()) {
                            response.setCode(21);
                            log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", new Object[]{requestHeader.getQueueOffset(), nextBeginOffset, topic, queueId, requestHeader.getConsumerGroup()});
                            break;
                        }
                        response.setCode(19);
                        break;
                    }
                    case NO_MATCHED_MESSAGE: {
                        response.setCode(20);
                        break;
                    }
                    case OFFSET_FOUND_NULL: {
                        response.setCode(19);
                        break;
                    }
                    case OFFSET_OVERFLOW_BADLY: {
                        response.setCode(21);
                        log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}", new Object[]{requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()});
                        break;
                    }
                    case OFFSET_OVERFLOW_ONE: {
                        response.setCode(19);
                        break;
                    }
                    case OFFSET_TOO_SMALL: {
                        response.setCode(21);
                        log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", new Object[]{requestHeader.getConsumerGroup(), topic, requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()});
                        break;
                    }
                    default: {
                        assert (false);
                        break;
                    }
                }
                if (this.hasConsumeMessageHook()) {
                    ConsumeMessageContext context = new ConsumeMessageContext();
                    context.setConsumerGroup(requestHeader.getConsumerGroup());
                    context.setTopic(topic);
                    context.setQueueId(queueId);
                    String owner = (String)request.getExtFields().get("Owner");
                    switch (response.getCode()) {
                        case 0: {
                            int commercialBaseCount = this.brokerController.getBrokerConfig().getCommercialBaseCount();
                            int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                            context.setCommercialRcvTimes(incValue);
                            context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                            context.setCommercialOwner(owner);
                            break;
                        }
                        case 19: {
                            if (brokerAllowSuspend) break;
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            context.setCommercialRcvTimes(1);
                            context.setCommercialOwner(owner);
                            break;
                        }
                        case 20: 
                        case 21: {
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            context.setCommercialRcvTimes(1);
                            context.setCommercialOwner(owner);
                            break;
                        }
                        default: {
                            assert (false);
                            break;
                        }
                    }
                    this.executeConsumeMessageHookBefore(context);
                }
                switch (response.getCode()) {
                    case 0: {
                        this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount());
                        this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize());
                        this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                        if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                            long beginTimeMills = this.brokerController.getMessageStore().now();
                            byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                            this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId().intValue(), (int)(this.brokerController.getMessageStore().now() - beginTimeMills));
                            response.setBody(r);
                            break;
                        }
                        try {
                            ManyMessageTransfer fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                            channel.writeAndFlush((Object)fileRegion).addListener((GenericFutureListener)new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    getMessageResult.release();
                                    if (!future.isSuccess()) {
                                        log.error("transfer many message by pagecache failed, {}", (Object)channel.remoteAddress(), (Object)future.cause());
                                    }
                                }
                            });
                        }
                        catch (Throwable e) {
                            log.error("transfer many message by pagecache exception", e);
                            getMessageResult.release();
                        }
                        response = null;
                        break;
                    }
                    case 19: {
                        if (brokerAllowSuspend && hasSuspendFlag) {
                            long pollingTimeMills = suspendTimeoutMillisLong;
                            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                            }
                            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, (MessageFilter)var18_22);
                            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                            response = null;
                            break;
                        }
                        if (queueRouteData != null) {
                            logicalQueuesInfo.readLock().lock();
                            try {
                                List queueRouteDataList = (List)logicalQueuesInfo.get(queueRouteData.getLogicalQueueIndex());
                                MessageQueue latestMessageQueue = ((LogicalQueueRouteData)queueRouteDataList.get(queueRouteDataList.size() - 1)).getMessageQueue();
                                if (!latestMessageQueue.getBrokerName().equals(this.brokerController.getBrokerConfig().getBrokerName()) || latestMessageQueue.getQueueId() != queueId) {
                                    this.prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
                                }
                                break block76;
                            }
                            finally {
                                logicalQueuesInfo.readLock().unlock();
                            }
                        }
                    }
                    case 20: {
                        break;
                    }
                    case 21: {
                        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                            MessageQueue mq = new MessageQueue();
                            mq.setTopic(requestHeader.getTopic());
                            mq.setQueueId(requestHeader.getQueueId().intValue());
                            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                            OffsetMovedEvent event = new OffsetMovedEvent();
                            event.setConsumerGroup(requestHeader.getConsumerGroup());
                            event.setMessageQueue(mq);
                            event.setOffsetRequest(requestHeader.getQueueOffset().longValue());
                            event.setOffsetNew(nextBeginOffset);
                            this.generateOffsetMovedEvent(event);
                            log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", new Object[]{requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId()});
                            break;
                        }
                        responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                        response.setCode(20);
                        log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", new Object[]{requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId()});
                        break;
                    }
                    default: {
                        assert (false);
                        {
                            break;
                        }
                    }
                }
                break block76;
            }
            response.setCode(1);
            response.setRemark("store getMessage return null");
        }
        boolean storeOffsetEnable = brokerAllowSuspend;
        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
        boolean bl = storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
        if (storeOffsetEnable) {
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr((Channel)channel), requestHeader.getConsumerGroup(), topic, queueId, requestHeader.getCommitOffset());
        }
        return response;
    }

    private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo, LogicalQueueRouteData queueRouteData) {
        LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable);
        if (nextReadableLogicalQueueRouteData != null) {
            response.addExtField("REDIRECT", "1");
            response.setBody(RemotingSerializable.encode((Object)ImmutableList.of((Object)queueRouteData, (Object)nextReadableLogicalQueueRouteData)));
        }
    }

    public boolean hasConsumeMessageHook() {
        return this.consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
    }

    public void executeConsumeMessageHookBefore(ConsumeMessageContext context) {
        if (this.hasConsumeMessageHook()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageBefore(context);
                }
                catch (Throwable throwable) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] readGetMessageResult(GetMessageResult getMessageResult, String group, String topic, int queueId) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long storeTimestamp = 0L;
        try {
            List messageBufferList = getMessageResult.getMessageBufferList();
            for (ByteBuffer bb : messageBufferList) {
                byteBuffer.put(bb);
                int sysFlag = bb.getInt(36);
                int bornhostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
                int msgStoreTimePos = 48 + bornhostLength;
                storeTimestamp = bb.getLong(msgStoreTimePos);
            }
        }
        finally {
            getMessageResult.release();
        }
        this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
        return byteBuffer.array();
    }

    private void generateOffsetMovedEvent(OffsetMovedEvent event) {
        try {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic("OFFSET_MOVED_EVENT");
            msgInner.setTags(event.getConsumerGroup());
            msgInner.setDelayTimeLevel(0);
            msgInner.setKeys(event.getConsumerGroup());
            msgInner.setBody(event.encode());
            msgInner.setFlag(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType)TopicFilterType.SINGLE_TAG, (String)msgInner.getTags()));
            msgInner.setQueueId(0);
            msgInner.setSysFlag(0);
            msgInner.setBornTimestamp(System.currentTimeMillis());
            msgInner.setBornHost(RemotingUtil.string2SocketAddress((String)this.brokerController.getBrokerAddr()));
            msgInner.setStoreHost(msgInner.getBornHost());
            msgInner.setReconsumeTimes(0);
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), (Throwable)e);
        }
    }

    public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable(){

            @Override
            public void run() {
                try {
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush((Object)response).addListener((GenericFutureListener)new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed", (Object)future.channel().remoteAddress(), (Object)future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        }
                        catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                }
                catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", (Throwable)e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit((Runnable)new RequestTask(run, channel, request));
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
        this.consumeMessageHookList = consumeMessageHookList;
    }
}

