package org.apache.inlong.manager.service.workflow.consumption.listener;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.event.ListenerResult;
import org.apache.inlong.manager.common.event.process.ProcessEvent;
import org.apache.inlong.manager.common.event.process.ProcessEventListener;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService;
import org.apache.inlong.manager.service.thirdpart.mq.TubeMqOptService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.class */
public class ConsumptionCompleteProcessListener implements ProcessEventListener {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionCompleteProcessListener.class);

    @Autowired
    private PulsarOptService pulsarMqOptService;

    @Autowired
    private ClusterBean clusterBean;

    @Autowired
    private BusinessService businessService;

    @Autowired
    private ConsumptionEntityMapper consumptionMapper;

    @Autowired
    private TubeMqOptService tubeMqOptService;

    /* renamed from: event, reason: merged with bridge method [inline-methods] */
    public ProcessEvent m77event() {
        return ProcessEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext workflowContext) throws WorkflowListenerException {
        Integer id = ((NewConsumptionWorkflowForm) workflowContext.getProcessForm()).getConsumptionInfo().getId();
        ConsumptionEntity selectByPrimaryKey = this.consumptionMapper.selectByPrimaryKey(id);
        if (selectByPrimaryKey == null) {
            throw new WorkflowListenerException("consumption not exits for id=" + id);
        }
        String middlewareType = selectByPrimaryKey.getMiddlewareType();
        if ("TUBE".equalsIgnoreCase(middlewareType)) {
            createTubeConsumerGroup(selectByPrimaryKey);
            return ListenerResult.success("Create Tube consumer group successful");
        }
        if (!"PULSAR".equalsIgnoreCase(middlewareType)) {
            throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported");
        }
        createPulsarTopicMessage(selectByPrimaryKey);
        updateConsumerInfo(id, selectByPrimaryKey.getConsumerGroupId());
        return ListenerResult.success("create Tube /Pulsar consumer group successful");
    }

    private void updateConsumerInfo(Integer num, String str) {
        ConsumptionEntity consumptionEntity = new ConsumptionEntity();
        consumptionEntity.setId(num);
        consumptionEntity.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        consumptionEntity.setConsumerGroupId(str);
        consumptionEntity.setModifyTime(new Date());
        this.consumptionMapper.updateByPrimaryKeySelective(consumptionEntity);
    }

    private void createPulsarTopicMessage(ConsumptionEntity consumptionEntity) {
        String inlongGroupId = consumptionEntity.getInlongGroupId();
        BusinessInfo businessInfo = this.businessService.get(inlongGroupId);
        Preconditions.checkNotNull(businessInfo, "business not found for groupId=" + inlongGroupId);
        String mqResourceObj = businessInfo.getMqResourceObj();
        Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + inlongGroupId);
        try {
            PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(businessInfo, this.clusterBean.getPulsarAdminUrl());
            Throwable th = null;
            try {
                PulsarTopicBean pulsarTopicBean = new PulsarTopicBean();
                pulsarTopicBean.setTenant(this.clusterBean.getDefaultTenant());
                pulsarTopicBean.setNamespace(mqResourceObj);
                createPulsarSubscription(pulsarAdmin, consumptionEntity.getConsumerGroupId(), pulsarTopicBean, PulsarUtils.getPulsarClusters(pulsarAdmin), Arrays.asList(consumptionEntity.getTopic().split(",")), businessInfo);
                if (pulsarAdmin != null) {
                    if (0 != 0) {
                        try {
                            pulsarAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        pulsarAdmin.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("create pulsar topic failed", e);
            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + inlongGroupId + ", reason: " + e.getMessage());
        }
    }

    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String str, PulsarTopicBean pulsarTopicBean, List<String> list, List<String> list2, BusinessInfo businessInfo) {
        try {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                PulsarAdmin pulsarAdmin2 = PulsarUtils.getPulsarAdmin(businessInfo, PulsarUtils.getServiceUrl(pulsarAdmin, it.next()));
                Throwable th = null;
                try {
                    try {
                        this.pulsarMqOptService.createSubscriptions(pulsarAdmin2, str, pulsarTopicBean, list2);
                        if (pulsarAdmin2 != null) {
                            if (0 != 0) {
                                try {
                                    pulsarAdmin2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                pulsarAdmin2.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            log.error("create pulsar consumer group failed", e);
            throw new WorkflowListenerException("failed to create pulsar consumer group");
        }
    }

    private void createTubeConsumerGroup(ConsumptionEntity consumptionEntity) {
        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
        addTubeConsumeGroupRequest.setClusterId(1);
        addTubeConsumeGroupRequest.setCreateUser(consumptionEntity.getCreator());
        AddTubeConsumeGroupRequest.GroupNameJsonSetBean groupNameJsonSetBean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
        groupNameJsonSetBean.setTopicName(consumptionEntity.getTopic());
        groupNameJsonSetBean.setGroupName(consumptionEntity.getConsumerGroupId());
        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(groupNameJsonSetBean));
        try {
            this.tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
        } catch (Exception e) {
            throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
        }
    }

    public boolean async() {
        return false;
    }
}
