package com.aliyun.openservices.ons.api.impl.rocketmq;

import com.alibaba.ons.open.trace.core.common.OnsTraceConstants;
import com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher;
import com.alibaba.ons.open.trace.core.dispatch.impl.AsyncTraceAppender;
import com.alibaba.ons.open.trace.core.dispatch.impl.AsyncTraceDispatcher;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.impl.tracehook.OnsConsumeMessageHookImpl;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

/* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/ConsumerImpl.class */
public class ConsumerImpl extends ONSClientAbstract implements Consumer {
    private static final Logger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final ConcurrentHashMap<String, MessageListener> subscribeTable;
    private final AtomicBoolean started;
    private AsyncDispatcher traceDispatcher;

    /* renamed from: com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/ConsumerImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$aliyun$openservices$ons$api$Action = new int[Action.values().length];

        static {
            try {
                $SwitchMap$com$aliyun$openservices$ons$api$Action[Action.CommitMessage.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$aliyun$openservices$ons$api$Action[Action.ReconsumeLater.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/ConsumerImpl$MessageListenerImpl.class */
    class MessageListenerImpl implements MessageListenerConcurrently {
        MessageListenerImpl() {
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            Message msgConvert = ONSUtil.msgConvert((com.alibaba.rocketmq.common.message.Message) messageExt);
            Map properties = messageExt.getProperties();
            msgConvert.setMsgID(messageExt.getMsgId());
            if (properties != null && properties.get("__transactionId__") != null) {
                msgConvert.setMsgID((String) properties.get("__transactionId__"));
            }
            MessageListener messageListener = (MessageListener) ConsumerImpl.this.subscribeTable.get(msgConvert.getTopic());
            if (null == messageListener) {
                throw new ONSClientException("MessageListener is null");
            }
            Action consume = messageListener.consume(msgConvert, new ConsumeContext());
            if (consume == null) {
                return null;
            }
            switch (AnonymousClass1.$SwitchMap$com$aliyun$openservices$ons$api$Action[consume.ordinal()]) {
                case 1:
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                case 2:
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                default:
                    return null;
            }
        }
    }

    public ConsumerImpl(Properties properties) {
        super(properties);
        this.subscribeTable = new ConcurrentHashMap<>();
        this.started = new AtomicBoolean(false);
        this.traceDispatcher = null;
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(new ClientRPCHook(this.sessionCredentials));
        String property = properties.getProperty("ConsumerId");
        if (null == property) {
            throw new ONSClientException("'ConsumerId' property is null");
        }
        String property2 = properties.getProperty("maxReconsumeTimes");
        if (!UtilAll.isBlank(property2)) {
            try {
                this.defaultMQPushConsumer.setMaxReconsumeTimes(Integer.parseInt(property2));
            } catch (NumberFormatException e) {
            }
        }
        if (!UtilAll.isBlank(properties.getProperty("consumeTimeout"))) {
            try {
                this.defaultMQPushConsumer.setConsumeTimeout(Integer.parseInt(r0));
            } catch (NumberFormatException e2) {
            }
        }
        this.defaultMQPushConsumer.setVipChannelEnabled(Boolean.parseBoolean(properties.getProperty("isVipChannelEnabled", "false")));
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(properties.getProperty("MessageModel", "CLUSTERING")));
        this.defaultMQPushConsumer.setConsumerGroup(property);
        this.defaultMQPushConsumer.setInstanceName(buildIntanceName());
        this.defaultMQPushConsumer.setNamesrvAddr(getNameServerAddr());
        if (properties.containsKey("ConsumeThreadNums")) {
            this.defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf(properties.get("ConsumeThreadNums").toString()).intValue());
            this.defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf(properties.get("ConsumeThreadNums").toString()).intValue());
        }
        try {
            Properties properties2 = new Properties();
            properties2.put("AccessKey", this.sessionCredentials.getAccessKey());
            properties2.put("SecretKey", this.sessionCredentials.getSecretKey());
            properties2.put("MaxMsgSize", "128000");
            properties2.put("AsyncBufferSize", "2048");
            properties2.put("MaxBatchNum", "1");
            properties2.put("WakeUpNum", "1");
            properties2.put(OnsTraceConstants.NAMESRV_ADDR, getNameServerAddr());
            properties2.put("InstanceName", buildIntanceName());
            AsyncTraceAppender asyncTraceAppender = new AsyncTraceAppender(properties2);
            this.traceDispatcher = new AsyncTraceDispatcher(properties2);
            this.traceDispatcher.start(asyncTraceAppender, this.defaultMQPushConsumer.getInstanceName());
            this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new OnsConsumeMessageHookImpl(this.traceDispatcher));
        } catch (Throwable th) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }

    public void start() {
        this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl());
        try {
            if (this.started.compareAndSet(false, true)) {
                this.defaultMQPushConsumer.start();
            }
        } catch (Exception e) {
            throw new ONSClientException(e.getMessage());
        }
    }

    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.defaultMQPushConsumer.shutdown();
            if (this.traceDispatcher != null) {
                try {
                    this.traceDispatcher.flush();
                } catch (IOException e) {
                    log.error("system mqtrace hook shutdown failed ,maybe loss some trace data");
                }
            }
        }
    }

    public void subscribe(String str, String str2, MessageListener messageListener) {
        if (null == str) {
            throw new ONSClientException("topic is null");
        }
        if (null == messageListener) {
            throw new ONSClientException("listener is null");
        }
        try {
            this.subscribeTable.put(str, messageListener);
            this.defaultMQPushConsumer.subscribe(str, str2);
        } catch (MQClientException e) {
            throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
        }
    }

    public void unsubscribe(String str) {
        if (null != str) {
            this.defaultMQPushConsumer.unsubscribe(str);
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public boolean isClosed() {
        return !isStarted();
    }
}
