/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.workflow.consumption.listener;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.thirdparty.mq.PulsarOptService;
import org.apache.inlong.manager.service.thirdparty.mq.TubeMqOptService;
import org.apache.inlong.manager.service.thirdparty.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ConsumptionCompleteProcessListener
implements ProcessEventListener {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionCompleteProcessListener.class);
    @Autowired
    private PulsarOptService pulsarMqOptService;
    @Autowired
    private ClusterBean clusterBean;
    @Autowired
    private CommonOperateService commonOperateService;
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private ConsumptionEntityMapper consumptionMapper;
    @Autowired
    private TubeMqOptService tubeMqOptService;

    public ProcessEvent event() {
        return ProcessEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        NewConsumptionProcessForm consumptionForm = (NewConsumptionProcessForm)context.getProcessForm();
        Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
        ConsumptionEntity entity = this.consumptionMapper.selectByPrimaryKey(consumptionId);
        if (entity == null) {
            throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
        }
        MQType mqType = MQType.forType((String)entity.getMiddlewareType());
        if (mqType == MQType.TUBE) {
            this.createTubeConsumerGroup(entity);
            return ListenerResult.success((String)"Create Tube consumer group successful");
        }
        if (mqType != MQType.PULSAR && mqType != MQType.TDMQ_PULSAR) {
            throw new WorkflowListenerException("middleware type [" + mqType + "] not supported");
        }
        this.createPulsarTopicMessage(entity);
        this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId());
        return ListenerResult.success((String)"create Tube /Pulsar consumer group successful");
    }

    private void updateConsumerInfo(Integer consumptionId, String consumerGroupId) {
        ConsumptionEntity update = new ConsumptionEntity();
        update.setId(consumptionId);
        update.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        update.setConsumerGroupId(consumerGroupId);
        update.setModifyTime(new Date());
        this.consumptionMapper.updateByPrimaryKeySelective(update);
    }

    private void createPulsarTopicMessage(ConsumptionEntity entity) {
        String groupId = entity.getInlongGroupId();
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        Preconditions.checkNotNull((Object)groupInfo, (String)("inlong group not found for groupId=" + groupId));
        String mqResourceObj = groupInfo.getMqResourceObj();
        Preconditions.checkNotNull((Object)mqResourceObj, (String)("mq resource cannot empty for groupId=" + groupId));
        PulsarClusterInfo globalCluster = this.commonOperateService.getPulsarClusterInfo(entity.getMiddlewareType());
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster);){
            PulsarTopicBean topicMessage = new PulsarTopicBean();
            String tenant = this.clusterBean.getDefaultTenant();
            topicMessage.setTenant(tenant);
            topicMessage.setNamespace(mqResourceObj);
            String consumerGroup = entity.getConsumerGroupId();
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            List<String> topics = Arrays.asList(entity.getTopic().split(","));
            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster);
        }
        catch (Exception e) {
            log.error("create pulsar topic failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " + e.getMessage());
        }
    }

    private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean, List<String> clusters, List<String> topics, PulsarClusterInfo globalCluster) {
        try {
            for (String cluster : clusters) {
                String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
                PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().token(globalCluster.getToken()).adminUrl(serviceUrl).build();
                PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
                Throwable throwable = null;
                try {
                    this.pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (pulsarAdmin == null) continue;
                    if (throwable != null) {
                        try {
                            pulsarAdmin.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    pulsarAdmin.close();
                }
            }
        }
        catch (Exception e) {
            log.error("create pulsar consumer group failed", (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar consumer group");
        }
    }

    private void createTubeConsumerGroup(ConsumptionEntity consumption) {
        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
        addTubeConsumeGroupRequest.setClusterId(1);
        addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
        AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
        bean.setTopicName(consumption.getTopic());
        bean.setGroupName(consumption.getConsumerGroupId());
        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
        try {
            this.tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
        }
        catch (Exception e) {
            throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
        }
    }

    public boolean async() {
        return false;
    }
}

