package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.BitSet;
import java.util.Iterator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.BatchAck;
import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/AckMessageProcessor.class */
public class AckMessageProcessor implements NettyRequestProcessor {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    private final String reviveTopic;
    private final PopReviveService[] popReviveServices;

    public AckMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
        this.popReviveServices = new PopReviveService[this.brokerController.getBrokerConfig().getReviveQueueNum()];
        for (int i = 0; i < this.brokerController.getBrokerConfig().getReviveQueueNum(); i++) {
            this.popReviveServices[i] = new PopReviveService(brokerController, this.reviveTopic, i);
            this.popReviveServices[i].setShouldRunPopRevive(brokerController.getBrokerConfig().getBrokerId() == 0);
        }
    }

    public PopReviveService[] getPopReviveServices() {
        return this.popReviveServices;
    }

    public void startPopReviveService() {
        for (PopReviveService popReviveService : this.popReviveServices) {
            popReviveService.start();
        }
    }

    public void shutdownPopReviveService() {
        for (PopReviveService popReviveService : this.popReviveServices) {
            popReviveService.shutdown();
        }
    }

    public void setPopReviveServiceStatus(boolean z) {
        for (PopReviveService popReviveService : this.popReviveServices) {
            popReviveService.setShouldRunPopRevive(z);
        }
    }

    public boolean isPopReviveServiceRunning() {
        for (PopReviveService popReviveService : this.popReviveServices) {
            if (popReviveService.isShouldRunPopRevive()) {
                return true;
            }
        }
        return false;
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        return processRequest(channelHandlerContext.channel(), remotingCommand, true);
    }

    public boolean rejectRequest() {
        return false;
    }

    private RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand, boolean z) throws RemotingCommandException {
        BatchAckMessageRequestBody batchAckMessageRequestBody = null;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(0, (String) null);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (remotingCommand.getCode() == 200051) {
            AckMessageRequestHeader ackMessageRequestHeader = (AckMessageRequestHeader) remotingCommand.decodeCommandCustomHeader(AckMessageRequestHeader.class);
            TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(ackMessageRequestHeader.getTopic());
            if (null == selectTopicConfig) {
                POP_LOGGER.error("The topic {} not exist, consumer: {} ", ackMessageRequestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
                createResponseCommand.setCode(17);
                createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", ackMessageRequestHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
                return createResponseCommand;
            }
            if (ackMessageRequestHeader.getQueueId().intValue() >= selectTopicConfig.getReadQueueNums() || ackMessageRequestHeader.getQueueId().intValue() < 0) {
                String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", ackMessageRequestHeader.getQueueId(), ackMessageRequestHeader.getTopic(), Integer.valueOf(selectTopicConfig.getReadQueueNums()), channel.remoteAddress());
                POP_LOGGER.warn(format);
                createResponseCommand.setCode(13);
                createResponseCommand.setRemark(format);
                return createResponseCommand;
            }
            long minOffsetInQueue = this.brokerController.getMessageStore().getMinOffsetInQueue(ackMessageRequestHeader.getTopic(), ackMessageRequestHeader.getQueueId().intValue());
            long maxOffsetInQueue = this.brokerController.getMessageStore().getMaxOffsetInQueue(ackMessageRequestHeader.getTopic(), ackMessageRequestHeader.getQueueId().intValue());
            if (ackMessageRequestHeader.getOffset().longValue() < minOffsetInQueue || ackMessageRequestHeader.getOffset().longValue() > maxOffsetInQueue) {
                String format2 = String.format("offset is illegal, key:%s@%d, commit:%d, store:%d~%d", ackMessageRequestHeader.getTopic(), ackMessageRequestHeader.getQueueId(), ackMessageRequestHeader.getOffset(), Long.valueOf(minOffsetInQueue), Long.valueOf(maxOffsetInQueue));
                POP_LOGGER.warn(format2);
                createResponseCommand.setCode(208);
                createResponseCommand.setRemark(format2);
                return createResponseCommand;
            }
            appendAck(ackMessageRequestHeader, null, createResponseCommand, channel, null);
        } else {
            if (remotingCommand.getCode() != 200151) {
                POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", Integer.valueOf(remotingCommand.getCode()), RemotingHelper.parseChannelRemoteAddr(channel));
                createResponseCommand.setCode(13);
                createResponseCommand.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", Integer.valueOf(remotingCommand.getCode())));
                return createResponseCommand;
            }
            if (remotingCommand.getBody() != null) {
                batchAckMessageRequestBody = (BatchAckMessageRequestBody) BatchAckMessageRequestBody.decode(remotingCommand.getBody(), BatchAckMessageRequestBody.class);
            }
            if (batchAckMessageRequestBody == null || batchAckMessageRequestBody.getAcks() == null || batchAckMessageRequestBody.getAcks().isEmpty()) {
                createResponseCommand.setCode(208);
                return createResponseCommand;
            }
            Iterator it = batchAckMessageRequestBody.getAcks().iterator();
            while (it.hasNext()) {
                appendAck(null, (BatchAck) it.next(), createResponseCommand, channel, batchAckMessageRequestBody.getBrokerName());
            }
        }
        return createResponseCommand;
    }

    private void appendAck(AckMessageRequestHeader ackMessageRequestHeader, BatchAck batchAck, RemotingCommand remotingCommand, Channel channel, String str) {
        String consumerGroup;
        String realTopic;
        int queueId;
        int reviveQueueId;
        long startOffset;
        long j;
        long popTime;
        long invisibleTime;
        AckMsg ackMsg;
        int size;
        if (batchAck == null) {
            String[] split = ExtraInfoUtil.split(ackMessageRequestHeader.getExtraInfo());
            str = ExtraInfoUtil.getBrokerName(split);
            consumerGroup = ackMessageRequestHeader.getConsumerGroup();
            realTopic = ackMessageRequestHeader.getTopic();
            queueId = ackMessageRequestHeader.getQueueId().intValue();
            reviveQueueId = ExtraInfoUtil.getReviveQid(split);
            startOffset = ExtraInfoUtil.getCkQueueOffset(split).longValue();
            j = ackMessageRequestHeader.getOffset().longValue();
            popTime = ExtraInfoUtil.getPopTime(split).longValue();
            invisibleTime = ExtraInfoUtil.getInvisibleTime(split).longValue();
            if (reviveQueueId == 999) {
                ackOrderly(realTopic, consumerGroup, queueId, j, popTime, invisibleTime, channel, remotingCommand);
                return;
            } else {
                ackMsg = new AckMsg();
                size = 1;
            }
        } else {
            consumerGroup = batchAck.getConsumerGroup();
            realTopic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
            queueId = batchAck.getQueueId();
            reviveQueueId = batchAck.getReviveQueueId();
            startOffset = batchAck.getStartOffset();
            j = -1;
            popTime = batchAck.getPopTime();
            invisibleTime = batchAck.getInvisibleTime();
            long minOffsetInQueue = this.brokerController.getMessageStore().getMinOffsetInQueue(realTopic, queueId);
            long maxOffsetInQueue = this.brokerController.getMessageStore().getMaxOffsetInQueue(realTopic, queueId);
            if (minOffsetInQueue == -1 || maxOffsetInQueue == -1) {
                POP_LOGGER.error("Illegal topic or queue found when batch ack {}", batchAck);
                return;
            }
            AckMsg batchAckMsg = new BatchAckMsg();
            BitSet bitSet = batchAck.getBitSet();
            int nextSetBit = bitSet.nextSetBit(0);
            while (true) {
                int i = nextSetBit;
                if (i < 0 || i == Integer.MAX_VALUE) {
                    break;
                }
                long j2 = startOffset + i;
                if (j2 >= minOffsetInQueue && j2 <= maxOffsetInQueue) {
                    if (reviveQueueId == 999) {
                        ackOrderly(realTopic, consumerGroup, queueId, j2, popTime, invisibleTime, channel, remotingCommand);
                    } else {
                        batchAckMsg.getAckOffsetList().add(Long.valueOf(j2));
                    }
                }
                nextSetBit = bitSet.nextSetBit(i + 1);
            }
            if (reviveQueueId == 999 || batchAckMsg.getAckOffsetList().isEmpty()) {
                return;
            }
            ackMsg = batchAckMsg;
            size = batchAckMsg.getAckOffsetList().size();
        }
        this.brokerController.getBrokerStatsManager().incBrokerAckNums(size);
        this.brokerController.getBrokerStatsManager().incGroupAckNums(consumerGroup, realTopic, size);
        ackMsg.setConsumerGroup(consumerGroup);
        ackMsg.setTopic(realTopic);
        ackMsg.setQueueId(queueId);
        ackMsg.setStartOffset(startOffset);
        ackMsg.setAckOffset(j);
        ackMsg.setPopTime(popTime);
        ackMsg.setBrokerName(str);
        if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(reviveQueueId, ackMsg)) {
            this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(realTopic, consumerGroup, popTime, queueId, size);
            return;
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(this.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
        messageExtBrokerInner.setQueueId(reviveQueueId);
        if (ackMsg instanceof BatchAckMsg) {
            messageExtBrokerInner.setTags("bAck");
            messageExtBrokerInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg));
        } else {
            messageExtBrokerInner.setTags("ack");
            messageExtBrokerInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genAckUniqueId(ackMsg));
        }
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(popTime + invisibleTime);
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genAckUniqueId(ackMsg));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(messageExtBrokerInner);
        if (putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageToSpecificQueue.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("put ack msg error:" + putMessageToSpecificQueue);
        }
        PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageToSpecificQueue.getPutMessageStatus());
        this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(realTopic, consumerGroup, popTime, queueId, size);
    }

    /* JADX WARN: Code restructure failed: missing block: B:29:0x0154, code lost:
    
        r27 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x0157, code lost:
    
        r9.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0167, code lost:
    
        throw r27;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void ackOrderly(java.lang.String r10, java.lang.String r11, int r12, long r13, long r15, long r17, io.netty.channel.Channel r19, org.apache.rocketmq.remoting.protocol.RemotingCommand r20) {
        /*
            Method dump skipped, instructions count: 377
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.broker.processor.AckMessageProcessor.ackOrderly(java.lang.String, java.lang.String, int, long, long, long, io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand):void");
    }
}
