/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody;
import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
import org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

public class QueryAssignmentProcessor
implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerController brokerController;
    private final ConcurrentHashMap<String, AllocateMessageQueueStrategy> name2LoadStrategy = new ConcurrentHashMap();
    private MessageRequestModeManager messageRequestModeManager;

    public QueryAssignmentProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(log);
        this.name2LoadStrategy.put(allocateMessageQueueAveragely.getName(), (AllocateMessageQueueStrategy)allocateMessageQueueAveragely);
        AllocateMessageQueueAveragelyByCircle allocateMessageQueueAveragelyByCircle = new AllocateMessageQueueAveragelyByCircle(log);
        this.name2LoadStrategy.put(allocateMessageQueueAveragelyByCircle.getName(), (AllocateMessageQueueStrategy)allocateMessageQueueAveragelyByCircle);
        this.messageRequestModeManager = new MessageRequestModeManager(brokerController);
        this.messageRequestModeManager.load();
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case 400: {
                return this.queryAssignment(ctx, request);
            }
            case 401: {
                return this.setMessageRequestMode(ctx, request);
            }
        }
        return null;
    }

    public boolean rejectRequest() {
        return false;
    }

    private RemotingCommand queryAssignment(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        QueryAssignmentRequestBody requestBody = (QueryAssignmentRequestBody)QueryAssignmentRequestBody.decode((byte[])request.getBody(), QueryAssignmentRequestBody.class);
        String topic = requestBody.getTopic();
        String consumerGroup = requestBody.getConsumerGroup();
        String clientId = requestBody.getClientId();
        MessageModel messageModel = requestBody.getMessageModel();
        String strategyName = requestBody.getStrategyName();
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryAssignmentResponseBody responseBody = new QueryAssignmentResponseBody();
        SetMessageRequestModeRequestBody setMessageRequestModeRequestBody = this.messageRequestModeManager.getMessageRequestMode(topic, consumerGroup);
        if (setMessageRequestModeRequestBody == null) {
            setMessageRequestModeRequestBody = new SetMessageRequestModeRequestBody();
            setMessageRequestModeRequestBody.setTopic(topic);
            setMessageRequestModeRequestBody.setConsumerGroup(consumerGroup);
            if (topic.startsWith("%RETRY%")) {
                setMessageRequestModeRequestBody.setMode(MessageRequestMode.PULL);
            } else {
                setMessageRequestModeRequestBody.setMode(this.brokerController.getBrokerConfig().getDefaultMessageRequestMode());
            }
            if (setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
                setMessageRequestModeRequestBody.setPopShareQueueNum(this.brokerController.getBrokerConfig().getDefaultPopShareQueueNum());
            }
        }
        Set<MessageQueue> messageQueues = this.doLoadBalance(topic, consumerGroup, clientId, messageModel, strategyName, setMessageRequestModeRequestBody, ctx);
        HashSet<MessageQueueAssignment> assignments = null;
        if (messageQueues != null) {
            assignments = new HashSet<MessageQueueAssignment>();
            for (MessageQueue messageQueue : messageQueues) {
                MessageQueueAssignment messageQueueAssignment = new MessageQueueAssignment();
                messageQueueAssignment.setMessageQueue(messageQueue);
                if (setMessageRequestModeRequestBody != null) {
                    messageQueueAssignment.setMode(setMessageRequestModeRequestBody.getMode());
                }
                assignments.add(messageQueueAssignment);
            }
        }
        responseBody.setMessageQueueAssignments(assignments);
        response.setBody(responseBody.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private Set<MessageQueue> doLoadBalance(String topic, String consumerGroup, String clientId, MessageModel messageModel, String strategyName, SetMessageRequestModeRequestBody setMessageRequestModeRequestBody, ChannelHandlerContext ctx) {
        Set<MessageQueue> assignedQueueSet = null;
        AssignmentManager assignmentManager = this.brokerController.getAssignmentManager();
        switch (messageModel) {
            case BROADCASTING: {
                assignedQueueSet = assignmentManager.getTopicSubscribeInfo(topic);
                if (assignedQueueSet != null) break;
                log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", (Object)consumerGroup, (Object)topic);
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = assignmentManager.getTopicSubscribeInfo(topic);
                if (null == mqSet) {
                    if (!topic.startsWith("%RETRY%")) {
                        log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", (Object)consumerGroup, (Object)topic);
                    }
                    return null;
                }
                if (!this.brokerController.getBrokerConfig().isServerLoadBalancerEnabled()) {
                    return mqSet;
                }
                List<String> cidAll = null;
                ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
                if (consumerGroupInfo != null) {
                    cidAll = consumerGroupInfo.getAllClientId();
                }
                if (null == cidAll) {
                    log.warn("QueryLoad: no assignment for group[{}] topic[{}], get consumer id list failed", (Object)consumerGroup, (Object)topic);
                    return null;
                }
                ArrayList<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                List<MessageQueue> allocateResult = null;
                try {
                    AllocateMessageQueueStrategy allocateMessageQueueStrategy = this.name2LoadStrategy.get(strategyName);
                    if (null == allocateMessageQueueStrategy) {
                        log.warn("QueryLoad: unsupported strategy [{}],  {}", (Object)consumerGroup, (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
                        return null;
                    }
                    allocateResult = setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP ? this.allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll, cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum()) : allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
                }
                catch (Throwable e) {
                    log.error("QueryLoad: no assignment for group[{}] topic[{}], allocate message queue exception. strategy name: {}, ex: {}", new Object[]{consumerGroup, topic, strategyName, e});
                    return null;
                }
                assignedQueueSet = new HashSet<MessageQueue>();
                if (allocateResult == null) break;
                assignedQueueSet.addAll(allocateResult);
                break;
            }
        }
        return assignedQueueSet;
    }

    public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy, String consumerGroup, String clientId, List<MessageQueue> mqAll, List<String> cidAll, int popShareQueueNum) {
        List<Object> allocateResult;
        if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
            allocateResult = new ArrayList<MessageQueue>(mqAll.size());
            for (MessageQueue mq : mqAll) {
                MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
                allocateResult.add(newMq);
            }
        } else if (cidAll.size() <= mqAll.size()) {
            allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
            int index = cidAll.indexOf(clientId);
            if (index >= 0) {
                for (int i = 1; i <= popShareQueueNum; ++i) {
                    ++index;
                    List tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index %= cidAll.size()), mqAll, cidAll);
                    allocateResult.addAll(tmp);
                }
            }
        } else {
            allocateResult = this.allocate(consumerGroup, clientId, mqAll, cidAll);
        }
        return allocateResult;
    }

    private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }
        ArrayList<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", new Object[]{consumerGroup, currentCID, cidAll});
            return result;
        }
        int index = cidAll.indexOf(currentCID);
        result.add(mqAll.get(index % mqAll.size()));
        return result;
    }

    private RemotingCommand setMessageRequestMode(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        SetMessageRequestModeRequestBody requestBody = (SetMessageRequestModeRequestBody)SetMessageRequestModeRequestBody.decode((byte[])request.getBody(), SetMessageRequestModeRequestBody.class);
        String topic = requestBody.getTopic();
        if (topic.startsWith("%RETRY%")) {
            response.setCode(16);
            response.setRemark("retry topic is not allowed to set mode");
            return response;
        }
        String consumerGroup = requestBody.getConsumerGroup();
        this.messageRequestModeManager.setMessageRequestMode(topic, consumerGroup, requestBody);
        this.messageRequestModeManager.persist();
        response.setCode(0);
        response.setRemark(null);
        return response;
    }
}

