/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.mq.util;

import java.util.Collection;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.tubemq.ConsumerGroupResponse;
import org.apache.inlong.manager.common.pojo.tubemq.TopicResponse;
import org.apache.inlong.manager.common.pojo.tubemq.TubeBrokerInfo;
import org.apache.inlong.manager.common.pojo.tubemq.TubeHttpResponse;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;

@Service
public class TubeMQOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final Integer SUCCESS_CODE = 0;
    private static final String TOPIC_NAME = "&topicName=";
    private static final String CONSUME_GROUP = "&consumeGroup=";
    private static final String GROUP_NAME = "&groupName=";
    private static final String BROKER_ID = "&brokerId=";
    private static final String CREATE_USER = "&createUser=";
    private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken=";
    private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view";
    private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status";
    private static final String ADD_TOPIC_PATH = "/webapi.htm?method=admin_add_new_topic_record";
    private static final String QUERY_CONSUMER_PATH = "/webapi.htm?method=admin_query_allowed_consumer_group_info";
    private static final String ADD_CONSUMER_PATH = "/webapi.htm?method=admin_add_authorized_consumergroup_info";
    @Autowired
    private HttpUtils httpUtils;

    public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
        String masterUrl = tubeCluster.getUrl();
        LOGGER.info("begin to create tube topic {} in master {}", (Object)topicName, (Object)masterUrl);
        if (StringUtils.isEmpty((CharSequence)masterUrl) || StringUtils.isEmpty((CharSequence)topicName)) {
            throw new BusinessException("tube master url or tube topic cannot be null");
        }
        if (this.isTopicExist(masterUrl, topicName)) {
            LOGGER.warn("tube topic {} already exists in {}, skip to create", (Object)topicName, (Object)masterUrl);
            return;
        }
        this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(), operator);
        LOGGER.info("success to create tube topic {} in {}", (Object)topicName, (Object)masterUrl);
    }

    public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
        String masterUrl = tubeCluster.getUrl();
        LOGGER.info("begin to create consumer group {} for topic {} in master {}", new Object[]{consumerGroup, topic, masterUrl});
        if (StringUtils.isEmpty((CharSequence)masterUrl) || StringUtils.isEmpty((CharSequence)consumerGroup) || StringUtils.isEmpty((CharSequence)topic)) {
            throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");
        }
        if (!this.isTopicExist(masterUrl, topic)) {
            LOGGER.warn("cannot create tube consumer group {}, as the topic {} not exists in master {}", new Object[]{consumerGroup, topic, masterUrl});
            return;
        }
        if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
            LOGGER.warn("tube consumer group {} already exists for topic {} in master {}, skip to create", new Object[]{consumerGroup, topic, masterUrl});
            return;
        }
        this.createConsumerGroupOpt(masterUrl, topic, consumerGroup, tubeCluster.getToken(), operator);
        LOGGER.info("success to create tube consumer group {} for topic {} in {}", new Object[]{consumerGroup, topic, masterUrl});
    }

    public boolean isTopicExist(String masterUrl, String topicName) {
        LOGGER.info("begin to check if the tube topic {} exists", (Object)topicName);
        String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
        try {
            TopicResponse topicView = (TopicResponse)this.httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(), TopicResponse.class);
            if (CollectionUtils.isEmpty((Collection)topicView.getData())) {
                LOGGER.warn("tube topic {} not exists in {}", (Object)topicName, (Object)url);
                return false;
            }
            LOGGER.info("tube topic {} exists in {}", (Object)topicName, (Object)url);
            return true;
        }
        catch (Exception e) {
            String msg = String.format("failed to check if the topic %s exist in ", topicName);
            LOGGER.error(msg + url, (Throwable)e);
            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
        }
    }

    public boolean isConsumerGroupExist(String masterUrl, String topicName, String consumerGroup) {
        LOGGER.info("begin to check if the consumer group {} exists on topic {}", (Object)consumerGroup, (Object)topicName);
        String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName + CONSUME_GROUP + consumerGroup;
        try {
            ConsumerGroupResponse response = (ConsumerGroupResponse)this.httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(), ConsumerGroupResponse.class);
            if (CollectionUtils.isEmpty((Collection)response.getData())) {
                LOGGER.warn("tube consumer group {} not exists for topic {} in {}", new Object[]{consumerGroup, topicName, url});
                return false;
            }
            LOGGER.info("tube consumer group {} exists for topic {} in {}", new Object[]{consumerGroup, topicName, url});
            return true;
        }
        catch (Exception e) {
            String msg = String.format("failed to check if the consumer group %s for topic %s exist in ", consumerGroup, topicName);
            LOGGER.error(msg + url, (Throwable)e);
            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
        }
    }

    private TubeBrokerInfo getBrokerInfo(String masterUrl) {
        String url = masterUrl + QUERY_BROKER_PATH;
        try {
            TubeBrokerInfo brokerInfo = (TubeBrokerInfo)this.httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(), TubeBrokerInfo.class);
            if (brokerInfo.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = "failed to query tube broker from %s, error: %s";
                LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
                throw new BusinessException(String.format(msg, masterUrl, brokerInfo.getErrMsg()));
            }
            brokerInfo.divideBrokerListByStatus();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("success to query tube broker from {}, result {}", (Object)url, (Object)brokerInfo.getData());
            }
            return brokerInfo;
        }
        catch (Exception e) {
            String msg = "failed to query tube broker from %s";
            LOGGER.error(String.format(msg, url), (Throwable)e);
            throw new BusinessException(String.format(msg, masterUrl) + ", error: " + e.getMessage());
        }
    }

    private void createTopicOpt(String masterUrl, String topicName, String token, String operator) {
        LOGGER.info(String.format("begin to create tube topic %s in master %s", topicName, masterUrl));
        TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
        List allBrokers = brokerView.getAllBrokerIdList();
        if (CollectionUtils.isEmpty((Collection)allBrokers)) {
            String msg = String.format("cannot create topic %s, as not any brokers found in %s", topicName, masterUrl);
            LOGGER.error(msg);
            throw new BusinessException(msg);
        }
        String url = masterUrl + ADD_TOPIC_PATH + TOPIC_NAME + topicName + BROKER_ID + StringUtils.join((Iterable)allBrokers, (String)",") + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
        try {
            TubeHttpResponse response = (TubeHttpResponse)this.httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(), TubeHttpResponse.class);
            if (response.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = String.format("failed to create tube topic %s, error: %s", topicName, response.getErrMsg());
                LOGGER.error(msg + " in {} for brokers {}", (Object)masterUrl, (Object)allBrokers);
                throw new BusinessException(msg);
            }
            LOGGER.info("success to create tube topic {} in {}", (Object)topicName, (Object)url);
        }
        catch (Exception e) {
            String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
            LOGGER.error(msg, (Throwable)e);
            throw new BusinessException(msg + ", error: " + e.getMessage());
        }
    }

    private void createConsumerGroupOpt(String masterUrl, String topicName, String consumerGroup, String token, String operator) {
        LOGGER.info(String.format("begin to create consumer group %s for topic %s in master %s", consumerGroup, topicName, masterUrl));
        String url = masterUrl + ADD_CONSUMER_PATH + TOPIC_NAME + topicName + GROUP_NAME + consumerGroup + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
        try {
            TubeHttpResponse response = (TubeHttpResponse)this.httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(), TubeHttpResponse.class);
            if (response.getErrCode() != SUCCESS_CODE.intValue()) {
                String msg = String.format("failed to create tube consumer group %s for topic %s, error: %s", consumerGroup, topicName, response.getErrMsg());
                LOGGER.error(msg + ", url {}", (Object)url);
                throw new BusinessException(msg);
            }
            LOGGER.info("success to create tube topic {} in {}", (Object)topicName, (Object)url);
        }
        catch (Exception e) {
            String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
            LOGGER.error(msg, (Throwable)e);
            throw new BusinessException(msg + ", error: " + e.getMessage());
        }
    }
}

