/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.mq;

import java.util.List;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CreatePulsarSubscriptionTaskListener
implements QueueOperateListener {
    private static final Logger log = LoggerFactory.getLogger(CreatePulsarSubscriptionTaskListener.class);
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private PulsarOperator pulsarOperator;
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private ConsumptionService consumptionService;

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
        StreamResourceProcessForm form = (StreamResourceProcessForm)context.getProcessForm();
        InlongGroupInfo groupInfo = form.getGroupInfo();
        InlongStreamInfo streamInfo = form.getStreamInfo();
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        String clusterTag = groupInfo.getInlongClusterTag();
        InlongClusterInfo clusterInfo = this.clusterService.getOne(clusterTag, null, "PULSAR");
        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfo;
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            List<String> sinkTypeList = this.sinkService.getSinkTypeList(groupId, streamId);
            if (sinkTypeList == null || sinkTypeList.size() == 0) {
                log.warn("sink info is empty for groupId={}, streamId={}, skip to create pulsar group", (Object)groupId, (Object)streamId);
                ListenerResult listenerResult = ListenerResult.success();
                return listenerResult;
            }
            String tenant = pulsarCluster.getTenant();
            String namespace = groupInfo.getMqResource();
            String topic = streamInfo.getMqResource();
            PulsarTopicBean topicBean = new PulsarTopicBean();
            topicBean.setTenant(tenant);
            topicBean.setNamespace(namespace);
            topicBean.setTopicName(topic);
            boolean exist = this.pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topic);
            if (!exist) {
                String fullTopic = tenant + "/" + namespace + "/" + topic;
                String msg = String.format("topic=%s not exists in %s", fullTopic, pulsarCluster.getAdminUrl());
                log.error(msg);
                throw new BusinessException(msg);
            }
            String subscription = clusterTag + "_" + topic + "_consumer_group";
            this.pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
            this.consumptionService.saveSortConsumption(groupInfo, topic, subscription);
        }
        catch (Exception e) {
            log.error("failed to create pulsar subscription for groupId=" + groupId + " streamId=" + streamId, (Throwable)e);
            throw new WorkflowListenerException("failed to create pulsar subscription: " + e.getMessage());
        }
        log.info("success to create pulsar subscription for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return ListenerResult.success();
    }

    public boolean async() {
        return false;
    }
}

