/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.PopReviveService;
import org.apache.rocketmq.broker.util.MsgUtil;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;

public class AckMessageProcessor
implements NettyRequestProcessor {
    private static final InternalLogger POP_LOGGER = InternalLoggerFactory.getLogger((String)"RocketmqPop");
    private final BrokerController brokerController;
    private String reviveTopic;
    private PopReviveService[] popReviveServices;

    public AckMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.reviveTopic = "rmq_sys_REVIVE_LOG_" + this.brokerController.getBrokerConfig().getBrokerClusterName();
        this.popReviveServices = new PopReviveService[this.brokerController.getBrokerConfig().getReviveQueueNum()];
        for (int i = 0; i < this.brokerController.getBrokerConfig().getReviveQueueNum(); ++i) {
            this.popReviveServices[i] = new PopReviveService(i, brokerController, this.reviveTopic);
        }
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        return this.processRequest(ctx.channel(), request, true);
    }

    public boolean rejectRequest() {
        return false;
    }

    public void startPopReviveService() {
        for (PopReviveService popReviveService : this.popReviveServices) {
            popReviveService.start();
        }
    }

    public void shutdownPopReviveService() {
        for (PopReviveService popReviveService : this.popReviveServices) {
            popReviveService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RemotingCommand processRequest(Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
        AckMessageRequestHeader requestHeader = (AckMessageRequestHeader)request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        AckMsg ackMsg = new AckMsg();
        RemotingCommand response = RemotingCommand.createResponseCommand((int)0, null);
        response.setOpaque(request.getOpaque());
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", (Object)requestHeader.getTopic(), (Object)RemotingHelper.parseChannelRemoteAddr((Channel)channel));
            response.setCode(17);
            response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/")));
            return response;
        }
        if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || requestHeader.getQueueId() < 0) {
            String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
            POP_LOGGER.warn(errorInfo);
            response.setCode(13);
            response.setRemark(errorInfo);
            return response;
        }
        long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId().intValue());
        long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId().intValue());
        if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) {
            response.setCode(208);
            return response;
        }
        String[] extraInfo = ExtraInfoUtil.split((String)requestHeader.getExtraInfo());
        ackMsg.setAckOffset(requestHeader.getOffset().longValue());
        ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset((String[])extraInfo).longValue());
        ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
        ackMsg.setTopic(requestHeader.getTopic());
        ackMsg.setQueueId(requestHeader.getQueueId().intValue());
        ackMsg.setPopTime(ExtraInfoUtil.getPopTime((String[])extraInfo).longValue());
        int rqId = ExtraInfoUtil.getReviveQid((String[])extraInfo);
        if (rqId == 999) {
            String lockKey = requestHeader.getTopic() + "@" + requestHeader.getConsumerGroup() + "@" + requestHeader.getQueueId();
            long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
            if (requestHeader.getOffset() < oldOffset) {
                return response;
            }
            while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
            }
            try {
                oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                if (requestHeader.getOffset() < oldOffset) {
                    RemotingCommand remotingCommand = response;
                    return remotingCommand;
                }
                long nextOffset = this.brokerController.getConsumerOrderInfoManager().commitAndNext(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId(), requestHeader.getOffset());
                if (nextOffset > -1L) {
                    this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
                    this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
                } else if (nextOffset == -1L) {
                    String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress());
                    POP_LOGGER.warn(errorInfo);
                    response.setCode(13);
                    response.setRemark(errorInfo);
                    RemotingCommand remotingCommand = response;
                    return remotingCommand;
                }
            }
            finally {
                this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
            }
            return response;
        }
        if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
            return response;
        }
        msgInner.setTopic(this.reviveTopic);
        msgInner.setBody(JSON.toJSONString((Object)ackMsg).getBytes(DataConverter.charset));
        msgInner.setQueueId(rqId);
        msgInner.setTags("ack");
        msgInner.setBornTimestamp(System.currentTimeMillis());
        msgInner.setBornHost((SocketAddress)this.brokerController.getStoreHost());
        msgInner.setStoreHost((SocketAddress)this.brokerController.getStoreHost());
        MsgUtil.setMessageDeliverTime(this.brokerController, (Message)msgInner, ExtraInfoUtil.getPopTime((String[])extraInfo) + ExtraInfoUtil.getInvisibleTime((String[])extraInfo));
        msgInner.getProperties().put("UNIQ_KEY", PopMessageProcessor.genAckUniqueId(ackMsg));
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("put ack msg error:" + putMessageResult);
        }
        return response;
    }
}

