/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.mq;

import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CreatePulsarResourceTaskListener
implements QueueOperateListener {
    private static final Logger log = LoggerFactory.getLogger(CreatePulsarResourceTaskListener.class);
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private PulsarOperator pulsarOperator;

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        GroupResourceProcessForm form = (GroupResourceProcessForm)context.getProcessForm();
        String groupId = form.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={}", (Object)groupId);
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        if (groupInfo == null) {
            throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
        }
        try {
            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
            InlongClusterInfo clusterInfo = this.clusterService.getOne(groupInfo.getInlongClusterTag(), null, "PULSAR");
            this.createPulsarProcess(pulsarInfo, (PulsarClusterInfo)clusterInfo);
        }
        catch (Exception e) {
            log.error("create pulsar resource error for groupId={}", (Object)groupId, (Object)e);
            throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
        }
        log.info("success to create pulsar resource for groupId={}", (Object)groupId);
        return ListenerResult.success();
    }

    private void createPulsarProcess(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster) throws Exception {
        String groupId = pulsarInfo.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={} in cluster={}", (Object)groupId, (Object)pulsarCluster);
        String namespace = pulsarInfo.getMqResource();
        Preconditions.checkNotNull((Object)namespace, (String)("pulsar namespace cannot be empty for groupId=" + groupId));
        String queueModule = pulsarInfo.getQueueModule();
        Preconditions.checkNotNull((Object)queueModule, (String)("queue module cannot be empty for groupId=" + groupId));
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            this.pulsarOperator.createTenant(pulsarAdmin, tenant);
            this.pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
            Integer partitionNum = pulsarInfo.getPartitionNum();
            List<InlongStreamBriefInfo> streamTopicList = this.streamService.getTopicList(groupId);
            PulsarTopicBean topicBean = PulsarTopicBean.builder().tenant(tenant).namespace(namespace).numPartitions(partitionNum).queueModule(queueModule).build();
            for (InlongStreamBriefInfo streamInfo : streamTopicList) {
                topicBean.setTopicName(streamInfo.getMqResource());
                this.pulsarOperator.createTopic(pulsarAdmin, topicBean);
            }
        }
        log.info("finish to create pulsar resource for groupId={}, cluster={}", (Object)groupId, (Object)pulsarCluster);
    }

    public boolean async() {
        return false;
    }
}

