package com.alibaba.tmq.client.system.consumer.implement;

import com.alibaba.dts.shade.com.taobao.common.fulllinkstresstesting.SplitEnvUtil;
import com.alibaba.tmq.client.TMQFactory;
import com.alibaba.tmq.client.context.ClientContext;
import com.alibaba.tmq.client.system.consumer.Consumer;
import com.alibaba.tmq.client.system.consumer.config.ConsumerConfig;
import com.alibaba.tmq.client.system.consumer.executer.ConsumerExecuter;
import com.alibaba.tmq.client.system.consumer.listener.MessageListener;
import com.alibaba.tmq.client.util.StringUtil;
import com.alibaba.tmq.common.constants.Constants;
import com.alibaba.tmq.common.domain.ConsumerKey;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/alibaba/tmq/client/system/consumer/implement/DefaultConsumer.class */
public class DefaultConsumer extends ClientContext implements Consumer, Constants {
    private static final Log logger = LogFactory.getLog(DefaultConsumer.class);
    private final ConsumerExecuter consumerExecuter;
    private final ConsumerConfig consumerConfig;

    public DefaultConsumer(ConsumerExecuter consumerExecuter, ConsumerConfig consumerConfig) {
        this.consumerExecuter = consumerExecuter;
        this.consumerConfig = consumerConfig;
    }

    @Override // com.alibaba.tmq.client.system.Role
    public void start() {
        if (!SplitEnvUtil.needStartDTS()) {
            logger.error("[DefaultConsumer]: start error, because of isolation environment");
            return;
        }
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        if (listenerTable.isEmpty()) {
            throw new RuntimeException("[DefaultConsumer]: start error, there is no any messageListener");
        }
        try {
            initClient();
            Iterator<Map.Entry<ConsumerKey, MessageListener>> it = listenerTable.entrySet().iterator();
            while (it.hasNext()) {
                ConsumerKey key = it.next().getKey();
                try {
                    clientRemoting.initConnection(this.consumerConfig.getInstanceName(), 1, this.consumerConfig.getConsumerId(), key.getTopic(), key.getTag());
                    logger.warn("[DefaultConsumer]: start initConnection, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + key);
                } catch (Throwable th) {
                    logger.error("[DefaultConsumer]: start initConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + key, th);
                    throw new RuntimeException("[DefaultConsumer]: start initConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig, th);
                }
            }
            this.consumerExecuter.setStart(true);
        } catch (Throwable th2) {
            throw new RuntimeException("[DefaultConsumer]: start initClient error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig, th2);
        }
    }

    @Override // com.alibaba.tmq.client.system.consumer.Consumer
    public void subscribe(String str, String str2, MessageListener messageListener) {
        if (StringUtil.isBlank(str)) {
            throw new RuntimeException("[DefaultConsumer]: subscribe error, topic is empty");
        }
        if (null == messageListener) {
            throw new RuntimeException("[DefaultConsumer]: subscribe error, messageListener is null");
        }
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        String concurrentHashMap = listenerTable.toString();
        listenerTable.put(new ConsumerKey(this.consumerConfig.getConsumerId(), str, str2), messageListener);
        logger.warn("[DefaultConsumer]: subscribe, consumerId:" + this.consumerConfig.getConsumerId() + ", instanceName:" + this.consumerConfig.getInstanceName() + ", topic:" + str + ", tag:" + str2 + ", beforeListenerTable:" + concurrentHashMap + ", afterListenerTable" + listenerTable);
    }

    @Override // com.alibaba.tmq.client.system.Role
    public void shutdown() {
        this.consumerExecuter.setStart(false);
        TMQFactory.removeConsumer(this.consumerConfig.getConsumerId(), this.consumerConfig.getInstanceName());
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        if (listenerTable.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ConsumerKey, MessageListener>> it = listenerTable.entrySet().iterator();
        while (it.hasNext()) {
            ConsumerKey key = it.next().getKey();
            try {
                clientRemoting.removeConnection(this.consumerConfig.getInstanceName(), 1, this.consumerConfig.getConsumerId(), key.getTopic(), key.getTag());
                logger.warn("[DefaultConsumer]: shutdown removeConnection, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + key);
            } catch (Throwable th) {
                logger.error("[DefaultConsumer]: shutdown removeConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + key, th);
            }
        }
    }
}
