/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.mq;

import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CreatePulsarTopicTaskListener
implements QueueOperateListener {
    private static final Logger log = LoggerFactory.getLogger(CreatePulsarTopicTaskListener.class);
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private PulsarOperator pulsarOperator;

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        StreamResourceProcessForm form = (StreamResourceProcessForm)context.getProcessForm();
        InlongGroupInfo groupInfo = form.getGroupInfo();
        InlongStreamInfo streamInfo = form.getStreamInfo();
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        log.info("begin to create pulsar topic for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        try {
            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
            String pulsarTopic = streamInfo.getMqResource();
            String clusterTag = pulsarInfo.getInlongClusterTag();
            InlongClusterInfo clusterInfo = this.clusterService.getOne(clusterTag, null, "PULSAR");
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
                PulsarTopicBean topicBean = PulsarTopicBean.builder().tenant(tenant).namespace(pulsarInfo.getMqResource()).topicName(pulsarTopic).queueModule(pulsarInfo.getQueueModule()).numPartitions(Integer.valueOf(pulsarInfo.getPartitionNum())).build();
                this.pulsarOperator.createTopic(pulsarAdmin, topicBean);
            }
        }
        catch (Exception e) {
            String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
            log.error(msg, (Throwable)e);
            throw new WorkflowListenerException(msg);
        }
        log.info("success to create pulsar topic for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return ListenerResult.success();
    }

    public boolean async() {
        return false;
    }
}

