package org.apache.inlong.manager.service.listener.consume.apply;

import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.class */
public class ApproveConsumeProcessListener implements ProcessEventListener {
    private static final Logger log = LoggerFactory.getLogger(ApproveConsumeProcessListener.class);

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private InlongConsumeEntityMapper consumeMapper;

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private PulsarOperator pulsarOperator;

    @Autowired
    private TubeMQOperator tubeMQOperator;

    /* renamed from: event, reason: merged with bridge method [inline-methods] */
    public ProcessEvent m43event() {
        return ProcessEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext workflowContext) throws WorkflowListenerException {
        Integer id = workflowContext.getProcessForm().getConsumeInfo().getId();
        InlongConsumeEntity selectById = this.consumeMapper.selectById(id);
        if (selectById == null) {
            throw new WorkflowListenerException("inlong consume not exits for id=" + id);
        }
        String mqType = selectById.getMqType();
        if ("TUBEMQ".equals(mqType)) {
            createTubeConsumerGroup(selectById, workflowContext.getOperator());
        } else if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            createPulsarSubscription(selectById);
        } else if (!"KAFKA".equals(mqType)) {
            throw new WorkflowListenerException("Unsupported MQ type " + mqType);
        }
        updateConsumerInfo(id, selectById.getConsumerGroup());
        return ListenerResult.success("Create MQ consumer group successful");
    }

    private void updateConsumerInfo(Integer num, String str) {
        InlongConsumeEntity selectById = this.consumeMapper.selectById(num);
        selectById.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
        selectById.setConsumerGroup(str);
        if (this.consumeMapper.updateByIdSelective(selectById) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            log.error("inlong consume has already updated, id={}, curVersion={}", selectById.getId(), selectById.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    private void createPulsarSubscription(InlongConsumeEntity inlongConsumeEntity) {
        String inlongGroupId = inlongConsumeEntity.getInlongGroupId();
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        Preconditions.expectNotNull(selectByGroupId, "inlong group not found for groupId=" + inlongGroupId);
        String mqResource = selectByGroupId.getMqResource();
        Preconditions.expectNotBlank(mqResource, ErrorCodeEnum.INVALID_PARAMETER, "mq resource cannot empty for groupId=" + inlongGroupId);
        PulsarClusterInfo one = this.clusterService.getOne(selectByGroupId.getInlongClusterTag(), null, "PULSAR");
        try {
            PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(one);
            try {
                String tenant = InlongPulsarDTO.getFromJson(selectByGroupId.getExtParams()).getTenant();
                if (StringUtils.isBlank(tenant)) {
                    tenant = one.getTenant();
                }
                PulsarTopicInfo pulsarTopicInfo = new PulsarTopicInfo();
                pulsarTopicInfo.setTenant(tenant);
                pulsarTopicInfo.setNamespace(mqResource);
                createPulsarSubscription(pulsarAdmin, inlongConsumeEntity.getConsumerGroup(), pulsarTopicInfo, Arrays.asList(inlongConsumeEntity.getTopic().split(",")));
                if (pulsarAdmin != null) {
                    pulsarAdmin.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("create pulsar topic failed", e);
            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + inlongGroupId + ", reason: " + e.getMessage());
        }
    }

    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String str, PulsarTopicInfo pulsarTopicInfo, List<String> list) {
        try {
            this.pulsarOperator.createSubscriptions(pulsarAdmin, str, pulsarTopicInfo, list);
        } catch (Exception e) {
            log.error("create pulsar consumer group failed", e);
            throw new WorkflowListenerException("failed to create pulsar consumer group");
        }
    }

    private void createTubeConsumerGroup(InlongConsumeEntity inlongConsumeEntity, String str) {
        String inlongGroupId = inlongConsumeEntity.getInlongGroupId();
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        Preconditions.expectNotNull(selectByGroupId, "inlong group not found for groupId=" + inlongGroupId);
        Preconditions.expectNotBlank(selectByGroupId.getMqResource(), ErrorCodeEnum.INVALID_PARAMETER, "mq resource cannot empty for groupId=" + inlongGroupId);
        try {
            this.tubeMQOperator.createConsumerGroup(this.clusterService.getOne(selectByGroupId.getInlongClusterTag(), null, "TUBEMQ"), inlongConsumeEntity.getTopic(), inlongConsumeEntity.getConsumerGroup(), str);
        } catch (Exception e) {
            log.error("failed to create tubemq consumer group: ", e);
            throw new WorkflowListenerException("failed to create tubemq consumer group: " + e.getMessage());
        }
    }
}
