/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.listener.consume.apply;

import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ApproveConsumeProcessListener
implements ProcessEventListener {
    private static final Logger log = LoggerFactory.getLogger(ApproveConsumeProcessListener.class);
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongConsumeEntityMapper consumeMapper;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private PulsarOperator pulsarOperator;
    @Autowired
    private TubeMQOperator tubeMQOperator;

    public ProcessEvent event() {
        return ProcessEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        ApplyConsumeProcessForm consumeForm = (ApplyConsumeProcessForm)context.getProcessForm();
        Integer consumeId = consumeForm.getConsumeInfo().getId();
        InlongConsumeEntity entity = this.consumeMapper.selectById(consumeId);
        if (entity == null) {
            throw new WorkflowListenerException("inlong consume not exits for id=" + consumeId);
        }
        String mqType = entity.getMqType();
        if ("TUBEMQ".equals(mqType)) {
            this.createTubeConsumerGroup(entity, context.getOperator());
            return ListenerResult.success((String)"Create TubeMQ consumer group successful");
        }
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            this.createPulsarSubscription(entity);
        } else if (!"KAFKA".equals(mqType)) {
            throw new WorkflowListenerException("Unsupported MQ type " + mqType);
        }
        this.updateConsumerInfo(consumeId, entity.getConsumerGroup());
        return ListenerResult.success((String)"Create MQ consumer group successful");
    }

    private void updateConsumerInfo(Integer consumeId, String consumerGroup) {
        InlongConsumeEntity existEntity = this.consumeMapper.selectById(consumeId);
        existEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
        existEntity.setConsumerGroup(consumerGroup);
        int rowCount = this.consumeMapper.updateByIdSelective(existEntity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            log.error("inlong consume has already updated, id={}, curVersion={}", (Object)existEntity.getId(), (Object)existEntity.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    private void createPulsarSubscription(InlongConsumeEntity entity) {
        String groupId = entity.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        Preconditions.checkNotNull((Object)groupEntity, (String)("inlong group not found for groupId=" + groupId));
        String mqResource = groupEntity.getMqResource();
        Preconditions.checkNotNull((Object)mqResource, (String)("mq resource cannot empty for groupId=" + groupId));
        String clusterTag = groupEntity.getInlongClusterTag();
        ClusterInfo clusterInfo = this.clusterService.getOne(clusterTag, null, "PULSAR");
        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            PulsarTopicInfo topicMessage = new PulsarTopicInfo();
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            topicMessage.setTenant(tenant);
            topicMessage.setNamespace(mqResource);
            String consumerGroup = entity.getConsumerGroup();
            List<String> topics = Arrays.asList(entity.getTopic().split(","));
            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
        }
        catch (Exception e) {
            log.error("create pulsar topic failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " + e.getMessage());
        }
    }

    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo, List<String> topics) {
        try {
            this.pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics);
        }
        catch (Exception e) {
            log.error("create pulsar consumer group failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar consumer group");
        }
    }

    private void createTubeConsumerGroup(InlongConsumeEntity entity, String operator) {
        String groupId = entity.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        Preconditions.checkNotNull((Object)groupEntity, (String)("inlong group not found for groupId=" + groupId));
        String mqResource = groupEntity.getMqResource();
        Preconditions.checkNotNull((Object)mqResource, (String)("mq resource cannot empty for groupId=" + groupId));
        String clusterTag = groupEntity.getInlongClusterTag();
        TubeClusterInfo clusterInfo = (TubeClusterInfo)this.clusterService.getOne(clusterTag, null, "TUBEMQ");
        try {
            this.tubeMQOperator.createConsumerGroup(clusterInfo, entity.getTopic(), entity.getConsumerGroup(), operator);
        }
        catch (Exception e) {
            log.error("failed to create tubemq consumer group: ", (Throwable)e);
            throw new WorkflowListenerException("failed to create tubemq consumer group: " + e.getMessage());
        }
    }
}

