package org.apache.rocketmq.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/SendMessageProcessor.class */
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private List<ConsumeMessageHook> consumeMessageHookList;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.processor.SendMessageProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/SendMessageProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$store$PutMessageStatus = new int[PutMessageStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.PUT_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_DISK_TIMEOUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_SLAVE_TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.SLAVE_NOT_AVAILABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.CREATE_MAPEDFILE_FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.MESSAGE_ILLEGAL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.PROPERTIES_SIZE_EXCEEDED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.SERVICE_NOT_AVAILABLE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.OS_PAGECACHE_BUSY.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.UNKNOWN_ERROR.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public SendMessageProcessor(BrokerController brokerController) {
        super(brokerController);
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand remotingCommand2 = null;
        try {
            remotingCommand2 = asyncProcessRequest(channelHandlerContext, remotingCommand).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("process SendMessage error, request : " + remotingCommand.toString(), e);
        }
        return remotingCommand2;
    }

    public void asyncProcessRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, RemotingResponseCallback remotingResponseCallback) throws Exception {
        CompletableFuture<RemotingCommand> asyncProcessRequest = asyncProcessRequest(channelHandlerContext, remotingCommand);
        remotingResponseCallback.getClass();
        asyncProcessRequest.thenAcceptAsync(remotingResponseCallback::callback, (Executor) this.brokerController.getPutMessageFutureExecutor());
    }

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        switch (remotingCommand.getCode()) {
            case 36:
                return asyncConsumerSendMsgBack(channelHandlerContext, remotingCommand);
            default:
                SendMessageRequestHeader parseRequestHeader = parseRequestHeader(remotingCommand);
                if (parseRequestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                SendMessageContext buildMsgContext = buildMsgContext(channelHandlerContext, parseRequestHeader);
                executeSendMessageHookBefore(channelHandlerContext, remotingCommand, buildMsgContext);
                return parseRequestHeader.isBatch() ? asyncSendBatchMessage(channelHandlerContext, remotingCommand, buildMsgContext, parseRequestHeader) : asyncSendMessage(channelHandlerContext, remotingCommand, buildMsgContext, parseRequestHeader);
        }
    }

    @Override // org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor
    public boolean rejectRequest() {
        return this.brokerController.getMessageStore().isOSPageCacheBusy() || this.brokerController.getMessageStore().isTransientStorePoolDeficient();
    }

    private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        Integer maxReconsumeTimes;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        ConsumerSendMsgBackRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        String namespaceFromResource = NamespaceUtil.getNamespaceFromResource(decodeCommandCustomHeader.getGroup());
        if (hasConsumeMessageHook() && !UtilAll.isBlank(decodeCommandCustomHeader.getOriginMsgId())) {
            executeConsumeMessageHookAfter(buildConsumeMessageContext(namespaceFromResource, decodeCommandCustomHeader, remotingCommand));
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark("subscription group not exist, " + decodeCommandCustomHeader.getGroup() + " " + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"));
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        if (findSubscriptionGroupConfig.getRetryQueueNums() <= 0) {
            createResponseCommand.setCode(0);
            createResponseCommand.setRemark((String) null);
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        String retryTopic = MixAll.getRetryTopic(decodeCommandCustomHeader.getGroup());
        int nextInt = ThreadLocalRandom.current().nextInt(99999999) % findSubscriptionGroupConfig.getRetryQueueNums();
        int i = 0;
        if (decodeCommandCustomHeader.isUnitMode()) {
            i = TopicSysFlag.buildSysFlag(false, true);
        }
        TopicConfig createTopicInSendMessageBackMethod = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(retryTopic, findSubscriptionGroupConfig.getRetryQueueNums(), 6, i);
        if (null == createTopicInSendMessageBackMethod) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("topic[" + retryTopic + "] not exist");
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        if (!PermName.isWriteable(createTopicInSendMessageBackMethod.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the topic[%s] sending message is forbidden", retryTopic));
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        MessageExt lookMessageByOffset = this.brokerController.getMessageStore().lookMessageByOffset(decodeCommandCustomHeader.getOffset().longValue());
        if (null == lookMessageByOffset) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("look message by offset failed, " + decodeCommandCustomHeader.getOffset());
            return CompletableFuture.completedFuture(createResponseCommand);
        }
        if (null == lookMessageByOffset.getProperty("RETRY_TOPIC")) {
            MessageAccessor.putProperty(lookMessageByOffset, "RETRY_TOPIC", lookMessageByOffset.getTopic());
        }
        lookMessageByOffset.setWaitStoreMsgOK(false);
        int intValue = decodeCommandCustomHeader.getDelayLevel().intValue();
        int retryMaxTimes = findSubscriptionGroupConfig.getRetryMaxTimes();
        if (remotingCommand.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && (maxReconsumeTimes = decodeCommandCustomHeader.getMaxReconsumeTimes()) != null) {
            retryMaxTimes = maxReconsumeTimes.intValue();
        }
        if (lookMessageByOffset.getReconsumeTimes() >= retryMaxTimes || intValue < 0) {
            retryTopic = MixAll.getDLQTopic(decodeCommandCustomHeader.getGroup());
            nextInt = ThreadLocalRandom.current().nextInt(99999999) % 1;
            if (null == this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(retryTopic, 1, 6, 0)) {
                createResponseCommand.setCode(1);
                createResponseCommand.setRemark("topic[" + retryTopic + "] not exist");
                return CompletableFuture.completedFuture(createResponseCommand);
            }
            lookMessageByOffset.setDelayTimeLevel(0);
        } else {
            if (0 == intValue) {
                intValue = 3 + lookMessageByOffset.getReconsumeTimes();
            }
            lookMessageByOffset.setDelayTimeLevel(intValue);
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(retryTopic);
        messageExtBrokerInner.setBody(lookMessageByOffset.getBody());
        messageExtBrokerInner.setFlag(lookMessageByOffset.getFlag());
        MessageAccessor.setProperties(messageExtBrokerInner, lookMessageByOffset.getProperties());
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(lookMessageByOffset.getProperties()));
        messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType) null, lookMessageByOffset.getTags()));
        messageExtBrokerInner.setQueueId(nextInt);
        messageExtBrokerInner.setSysFlag(lookMessageByOffset.getSysFlag());
        messageExtBrokerInner.setBornTimestamp(lookMessageByOffset.getBornTimestamp());
        messageExtBrokerInner.setBornHost(lookMessageByOffset.getBornHost());
        messageExtBrokerInner.setStoreHost(lookMessageByOffset.getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(lookMessageByOffset.getReconsumeTimes() + 1);
        String originMessageId = MessageAccessor.getOriginMessageId(lookMessageByOffset);
        MessageAccessor.setOriginMessageId(messageExtBrokerInner, UtilAll.isBlank(originMessageId) ? lookMessageByOffset.getMsgId() : originMessageId);
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(lookMessageByOffset.getProperties()));
        return this.brokerController.getMessageStore().asyncPutMessage(messageExtBrokerInner).thenApply(putMessageResult -> {
            if (putMessageResult == null) {
                createResponseCommand.setCode(1);
                createResponseCommand.setRemark("putMessageResult is null");
                return createResponseCommand;
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$store$PutMessageStatus[putMessageResult.getPutMessageStatus().ordinal()]) {
                case 1:
                    String topic = lookMessageByOffset.getTopic();
                    String property = lookMessageByOffset.getProperty("RETRY_TOPIC");
                    if (property != null) {
                        topic = property;
                    }
                    if ("SCHEDULE_TOPIC_XXXX".equals(messageExtBrokerInner.getTopic())) {
                        this.brokerController.getBrokerStatsManager().incTopicPutNums(messageExtBrokerInner.getTopic());
                        this.brokerController.getBrokerStatsManager().incTopicPutSize(messageExtBrokerInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
                        this.brokerController.getBrokerStatsManager().incQueuePutNums(messageExtBrokerInner.getTopic(), Integer.valueOf(messageExtBrokerInner.getQueueId()));
                        this.brokerController.getBrokerStatsManager().incQueuePutSize(messageExtBrokerInner.getTopic(), Integer.valueOf(messageExtBrokerInner.getQueueId()), putMessageResult.getAppendMessageResult().getWroteBytes());
                    }
                    this.brokerController.getBrokerStatsManager().incSendBackNums(decodeCommandCustomHeader.getGroup(), topic);
                    createResponseCommand.setCode(0);
                    createResponseCommand.setRemark((String) null);
                    return createResponseCommand;
                default:
                    createResponseCommand.setCode(1);
                    createResponseCommand.setRemark(putMessageResult.getPutMessageStatus().name());
                    return createResponseCommand;
            }
        });
    }

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageContext sendMessageContext, SendMessageRequestHeader sendMessageRequestHeader) {
        CompletableFuture<PutMessageResult> asyncPutMessage;
        RemotingCommand preSend = preSend(channelHandlerContext, remotingCommand, sendMessageRequestHeader);
        SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) preSend.readCustomHeader();
        if (preSend.getCode() != -1) {
            return CompletableFuture.completedFuture(preSend);
        }
        byte[] body = remotingCommand.getBody();
        int intValue = sendMessageRequestHeader.getQueueId().intValue();
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(sendMessageRequestHeader.getTopic());
        if (intValue < 0) {
            intValue = randomQueueId(selectTopicConfig.getWriteQueueNums());
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(sendMessageRequestHeader.getTopic());
        messageExtBrokerInner.setQueueId(intValue);
        if (!handleRetryAndDLQ(sendMessageRequestHeader, preSend, remotingCommand, messageExtBrokerInner, selectTopicConfig)) {
            return CompletableFuture.completedFuture(preSend);
        }
        messageExtBrokerInner.setBody(body);
        messageExtBrokerInner.setFlag(sendMessageRequestHeader.getFlag().intValue());
        Map string2messageProperties = MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties());
        MessageAccessor.setProperties(messageExtBrokerInner, string2messageProperties);
        messageExtBrokerInner.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp().longValue());
        messageExtBrokerInner.setBornHost(channelHandlerContext.channel().remoteAddress());
        messageExtBrokerInner.setStoreHost(getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue());
        MessageAccessor.putProperty(messageExtBrokerInner, "CLUSTER", this.brokerController.getBrokerConfig().getBrokerClusterName());
        if (string2messageProperties.containsKey("WAIT")) {
            String str = (String) string2messageProperties.remove("WAIT");
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
            string2messageProperties.put("WAIT", str);
        } else {
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        }
        String str2 = (String) string2messageProperties.get("TRAN_MSG");
        if (str2 == null || !Boolean.parseBoolean(str2)) {
            asyncPutMessage = this.brokerController.getMessageStore().asyncPutMessage(messageExtBrokerInner);
        } else {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                preSend.setCode(16);
                preSend.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(preSend);
            }
            asyncPutMessage = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(messageExtBrokerInner);
        }
        return handlePutMessageResultFuture(asyncPutMessage, preSend, remotingCommand, messageExtBrokerInner, sendMessageResponseHeader, sendMessageContext, channelHandlerContext, intValue);
    }

    private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> completableFuture, RemotingCommand remotingCommand, RemotingCommand remotingCommand2, MessageExt messageExt, SendMessageResponseHeader sendMessageResponseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext channelHandlerContext, int i) {
        return completableFuture.thenApply(putMessageResult -> {
            return handlePutMessageResult(putMessageResult, remotingCommand, remotingCommand2, messageExt, sendMessageResponseHeader, sendMessageContext, channelHandlerContext, i);
        });
    }

    private boolean handleRetryAndDLQ(SendMessageRequestHeader sendMessageRequestHeader, RemotingCommand remotingCommand, RemotingCommand remotingCommand2, MessageExt messageExt, TopicConfig topicConfig) {
        String topic = sendMessageRequestHeader.getTopic();
        if (null != topic && topic.startsWith("%RETRY%")) {
            String substring = topic.substring("%RETRY%".length());
            SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(substring);
            if (null == findSubscriptionGroupConfig) {
                remotingCommand.setCode(26);
                remotingCommand.setRemark("subscription group not exist, " + substring + " " + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"));
                return false;
            }
            int retryMaxTimes = findSubscriptionGroupConfig.getRetryMaxTimes();
            if (remotingCommand2.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && sendMessageRequestHeader.getMaxReconsumeTimes() != null) {
                retryMaxTimes = sendMessageRequestHeader.getMaxReconsumeTimes().intValue();
            }
            if ((sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue()) >= retryMaxTimes) {
                String dLQTopic = MixAll.getDLQTopic(substring);
                int nextInt = ThreadLocalRandom.current().nextInt(99999999) % 1;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(dLQTopic, 1, 6, 0);
                messageExt.setTopic(dLQTopic);
                messageExt.setQueueId(nextInt);
                messageExt.setDelayTimeLevel(0);
                if (null == topicConfig) {
                    remotingCommand.setCode(1);
                    remotingCommand.setRemark("topic[" + dLQTopic + "] not exist");
                    return false;
                }
            }
        }
        int intValue = sendMessageRequestHeader.getSysFlag().intValue();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            intValue |= 2;
        }
        messageExt.setSysFlag(intValue);
        return true;
    }

    private RemotingCommand sendMessage(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageContext sendMessageContext, SendMessageRequestHeader sendMessageRequestHeader) throws RemotingCommandException {
        PutMessageResult putMessage;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) createResponseCommand.readCustomHeader();
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        createResponseCommand.addExtField("MSG_REGION", this.brokerController.getBrokerConfig().getRegionId());
        createResponseCommand.addExtField("TRACE_ON", String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
        log.debug("receive SendMessage request command, {}", remotingCommand);
        long startAcceptSendRequestTimeStamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startAcceptSendRequestTimeStamp) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startAcceptSendRequestTimeStamp)));
            return createResponseCommand;
        }
        createResponseCommand.setCode(-1);
        super.msgCheck(channelHandlerContext, sendMessageRequestHeader, createResponseCommand);
        if (createResponseCommand.getCode() != -1) {
            return createResponseCommand;
        }
        byte[] body = remotingCommand.getBody();
        int intValue = sendMessageRequestHeader.getQueueId().intValue();
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(sendMessageRequestHeader.getTopic());
        if (intValue < 0) {
            intValue = ThreadLocalRandom.current().nextInt(99999999) % selectTopicConfig.getWriteQueueNums();
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(sendMessageRequestHeader.getTopic());
        messageExtBrokerInner.setQueueId(intValue);
        if (!handleRetryAndDLQ(sendMessageRequestHeader, createResponseCommand, remotingCommand, messageExtBrokerInner, selectTopicConfig)) {
            return createResponseCommand;
        }
        messageExtBrokerInner.setBody(body);
        messageExtBrokerInner.setFlag(sendMessageRequestHeader.getFlag().intValue());
        MessageAccessor.setProperties(messageExtBrokerInner, MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
        messageExtBrokerInner.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp().longValue());
        messageExtBrokerInner.setBornHost(channelHandlerContext.channel().remoteAddress());
        messageExtBrokerInner.setStoreHost(getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue());
        MessageAccessor.putProperty(messageExtBrokerInner, "CLUSTER", this.brokerController.getBrokerConfig().getBrokerClusterName());
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        String str = (String) MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()).get("TRAN_MSG");
        if (str == null || !Boolean.parseBoolean(str) || (messageExtBrokerInner.getReconsumeTimes() > 0 && messageExtBrokerInner.getDelayTimeLevel() > 0)) {
            putMessage = this.brokerController.getMessageStore().putMessage(messageExtBrokerInner);
        } else {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                createResponseCommand.setCode(16);
                createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                return createResponseCommand;
            }
            putMessage = this.brokerController.getTransactionalMessageService().prepareMessage(messageExtBrokerInner);
        }
        return handlePutMessageResult(putMessage, createResponseCommand, remotingCommand, messageExtBrokerInner, sendMessageResponseHeader, sendMessageContext, channelHandlerContext, intValue);
    }

    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand remotingCommand, RemotingCommand remotingCommand2, MessageExt messageExt, SendMessageResponseHeader sendMessageResponseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext channelHandlerContext, int i) {
        if (putMessageResult == null) {
            remotingCommand.setCode(1);
            remotingCommand.setRemark("store putMessage return null");
            return remotingCommand;
        }
        boolean z = false;
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$store$PutMessageStatus[putMessageResult.getPutMessageStatus().ordinal()]) {
            case 1:
                z = true;
                remotingCommand.setCode(0);
                break;
            case 2:
                remotingCommand.setCode(10);
                z = true;
                break;
            case 3:
                remotingCommand.setCode(12);
                z = true;
                break;
            case 4:
                remotingCommand.setCode(11);
                z = true;
                break;
            case 5:
                remotingCommand.setCode(1);
                remotingCommand.setRemark("create mapped file failed, server is busy or broken.");
                break;
            case 6:
            case 7:
                remotingCommand.setCode(13);
                remotingCommand.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                break;
            case 8:
                remotingCommand.setCode(14);
                remotingCommand.setRemark("service not available now. It may be caused by one of the following reasons: the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");
                break;
            case 9:
                remotingCommand.setCode(1);
                remotingCommand.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            case 10:
                remotingCommand.setCode(1);
                remotingCommand.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
                break;
            case 11:
                remotingCommand.setCode(1);
                remotingCommand.setRemark("UNKNOWN_ERROR");
                break;
            default:
                remotingCommand.setCode(1);
                remotingCommand.setRemark("UNKNOWN_ERROR DEFAULT");
                break;
        }
        String str = (String) remotingCommand2.getExtFields().get("Owner");
        if (!z) {
            if (hasSendMessageHook()) {
                int length = remotingCommand2.getBody().length;
                int ceil = (int) Math.ceil(length / 65536.0d);
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendTimes(ceil);
                sendMessageContext.setCommercialSendSize(length);
                sendMessageContext.setCommercialOwner(str);
            }
            return remotingCommand;
        }
        if ("SCHEDULE_TOPIC_XXXX".equals(messageExt.getTopic())) {
            this.brokerController.getBrokerStatsManager().incQueuePutNums(messageExt.getTopic(), Integer.valueOf(messageExt.getQueueId()), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incQueuePutSize(messageExt.getTopic(), Integer.valueOf(messageExt.getQueueId()), putMessageResult.getAppendMessageResult().getWroteBytes());
        }
        this.brokerController.getBrokerStatsManager().incTopicPutNums(messageExt.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(messageExt.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
        remotingCommand.setRemark((String) null);
        sendMessageResponseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        sendMessageResponseHeader.setQueueId(Integer.valueOf(i));
        sendMessageResponseHeader.setQueueOffset(Long.valueOf(putMessageResult.getAppendMessageResult().getLogicsOffset()));
        doResponse(channelHandlerContext, remotingCommand2, remotingCommand);
        if (!hasSendMessageHook()) {
            return null;
        }
        sendMessageContext.setMsgId(sendMessageResponseHeader.getMsgId());
        sendMessageContext.setQueueId(sendMessageResponseHeader.getQueueId());
        sendMessageContext.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
        int commercialBaseCount = this.brokerController.getBrokerConfig().getCommercialBaseCount();
        int wroteBytes = putMessageResult.getAppendMessageResult().getWroteBytes();
        int ceil2 = ((int) Math.ceil(wroteBytes / 65536.0d)) * commercialBaseCount;
        sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
        sendMessageContext.setCommercialSendTimes(ceil2);
        sendMessageContext.setCommercialSendSize(wroteBytes);
        sendMessageContext.setCommercialOwner(str);
        return null;
    }

    private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageContext sendMessageContext, SendMessageRequestHeader sendMessageRequestHeader) {
        RemotingCommand preSend = preSend(channelHandlerContext, remotingCommand, sendMessageRequestHeader);
        SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) preSend.readCustomHeader();
        if (preSend.getCode() != -1) {
            return CompletableFuture.completedFuture(preSend);
        }
        int intValue = sendMessageRequestHeader.getQueueId().intValue();
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(sendMessageRequestHeader.getTopic());
        if (intValue < 0) {
            intValue = randomQueueId(selectTopicConfig.getWriteQueueNums());
        }
        if (sendMessageRequestHeader.getTopic().length() > 127) {
            preSend.setCode(13);
            preSend.setRemark("message topic length too long " + sendMessageRequestHeader.getTopic().length());
            return CompletableFuture.completedFuture(preSend);
        }
        MessageExtBatch messageExtBatch = new MessageExtBatch();
        messageExtBatch.setTopic(sendMessageRequestHeader.getTopic());
        messageExtBatch.setQueueId(intValue);
        int intValue2 = sendMessageRequestHeader.getSysFlag().intValue();
        if (TopicFilterType.MULTI_TAG == selectTopicConfig.getTopicFilterType()) {
            intValue2 |= 2;
        }
        messageExtBatch.setSysFlag(intValue2);
        messageExtBatch.setFlag(sendMessageRequestHeader.getFlag().intValue());
        MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
        messageExtBatch.setBody(remotingCommand.getBody());
        messageExtBatch.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp().longValue());
        messageExtBatch.setBornHost(channelHandlerContext.channel().remoteAddress());
        messageExtBatch.setStoreHost(getStoreHost());
        messageExtBatch.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue());
        MessageAccessor.putProperty(messageExtBatch, "CLUSTER", this.brokerController.getBrokerConfig().getBrokerClusterName());
        return handlePutMessageResultFuture(this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch), preSend, remotingCommand, messageExtBatch, sendMessageResponseHeader, sendMessageContext, channelHandlerContext, intValue);
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    public void executeConsumeMessageHookAfter(ConsumeMessageContext consumeMessageContext) {
        if (hasConsumeMessageHook()) {
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageAfter(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }

    @Override // org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor
    public SocketAddress getStoreHost() {
        return this.storeHost;
    }

    private String diskUtil() {
        double d = 100.0d;
        DefaultMessageStore messageStore = this.brokerController.getMessageStore();
        for (String str : (messageStore instanceof DefaultMessageStore ? messageStore.getStorePathPhysic() : this.brokerController.getMessageStoreConfig().getStorePathCommitLog()).trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
            d = Math.min(d, UtilAll.getDiskPartitionSpaceUsedPercent(str));
        }
        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", Double.valueOf(d), Double.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()))), Double.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()))));
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    private static ConsumeMessageContext buildConsumeMessageContext(String str, ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader, RemotingCommand remotingCommand) {
        ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(str);
        consumeMessageContext.setConsumerGroup(consumerSendMsgBackRequestHeader.getGroup());
        consumeMessageContext.setTopic(consumerSendMsgBackRequestHeader.getOriginTopic());
        consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
        consumeMessageContext.setCommercialRcvTimes(1);
        consumeMessageContext.setCommercialOwner((String) remotingCommand.getExtFields().get("Owner"));
        return consumeMessageContext;
    }

    private int randomQueueId(int i) {
        return ThreadLocalRandom.current().nextInt(99999999) % i;
    }

    private RemotingCommand preSend(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageRequestHeader sendMessageRequestHeader) {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        createResponseCommand.addExtField("MSG_REGION", this.brokerController.getBrokerConfig().getRegionId());
        createResponseCommand.addExtField("TRACE_ON", String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
        log.debug("Receive SendMessage request command {}", remotingCommand);
        long startAcceptSendRequestTimeStamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startAcceptSendRequestTimeStamp) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startAcceptSendRequestTimeStamp)));
            return createResponseCommand;
        }
        createResponseCommand.setCode(-1);
        super.msgCheck(channelHandlerContext, sendMessageRequestHeader, createResponseCommand);
        return createResponseCommand.getCode() != -1 ? createResponseCommand : createResponseCommand;
    }
}
