/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.base.Objects;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarResourceOperator
implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(PulsarResourceOperator.class);
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private ConsumptionService consumptionService;
    @Autowired
    private PulsarOperator pulsarOperator;

    @Override
    public boolean accept(String mqType) {
        return "PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType);
    }

    @Override
    public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
        Preconditions.checkNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.checkNotNull((Object)operator, (String)"operator cannot be null");
        String groupId = groupInfo.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={}", (Object)groupId);
        String clusterTag = groupInfo.getInlongClusterTag();
        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)this.clusterService.getOne(clusterTag, null, "PULSAR");
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            List<InlongStreamBriefInfo> streamInfoList;
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            String namespace = groupInfo.getMqResource();
            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
            if (!Objects.equal((Object)GroupStatus.CONFIG_SUCCESSFUL.getCode(), (Object)groupInfo.getStatus())) {
                this.pulsarOperator.createTenant(pulsarAdmin, tenant);
                log.info("success to create pulsar tenant for groupId={}, tenant={}", (Object)groupId, (Object)tenant);
                this.pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
                log.info("success to create pulsar namespace for groupId={}, namespace={}", (Object)groupId, (Object)namespace);
            }
            if ((streamInfoList = this.streamService.getTopicList(groupId)) == null || streamInfoList.isEmpty()) {
                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", (Object)groupId);
                return;
            }
            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
                this.createPulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
            }
        }
        catch (Exception e) {
            String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
            log.error(msg, (Throwable)e);
            throw new WorkflowListenerException(msg + ": " + e.getMessage());
        }
        log.info("success to create pulsar resource for groupId={}, cluster={}", (Object)groupId, (Object)pulsarCluster);
    }

    @Override
    public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
        Preconditions.checkNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        String groupId = groupInfo.getInlongGroupId();
        log.info("begin to delete pulsar resource for groupId={}", (Object)groupId);
        ClusterInfo clusterInfo = this.clusterService.getOne(groupInfo.getInlongClusterTag(), null, "PULSAR");
        try {
            List<InlongStreamBriefInfo> streamInfoList = this.streamService.getTopicList(groupId);
            if (streamInfoList == null || streamInfoList.isEmpty()) {
                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", (Object)groupId);
                return;
            }
            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
                this.deletePulsarTopic(groupInfo, (PulsarClusterInfo)clusterInfo, streamInfo.getMqResource());
            }
        }
        catch (Exception e) {
            log.error("failed to delete pulsar resource for groupId=" + groupId, (Throwable)e);
            throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
        }
        log.info("success to delete pulsar resource for groupId={}, cluster={}", (Object)groupId, (Object)clusterInfo);
    }

    @Override
    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
        Preconditions.checkNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.checkNotNull((Object)streamInfo, (String)"inlong stream info cannot be null");
        Preconditions.checkNotNull((Object)operator, (String)"operator cannot be null");
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        log.info("begin to create pulsar resource for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        try {
            String clusterTag = groupInfo.getInlongClusterTag();
            ClusterInfo clusterInfo = this.clusterService.getOne(clusterTag, null, "PULSAR");
            this.createPulsarTopic(groupInfo, (PulsarClusterInfo)clusterInfo, streamInfo.getMqResource());
        }
        catch (Exception e) {
            String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
            log.error(msg, (Throwable)e);
            throw new WorkflowListenerException(msg + ": " + e.getMessage());
        }
        log.info("success to create pulsar resource for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
    }

    @Override
    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
        Preconditions.checkNotNull((Object)groupInfo, (String)"inlong group info cannot be null");
        Preconditions.checkNotNull((Object)streamInfo, (String)"inlong stream info cannot be null");
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        log.info("begin to delete pulsar resource for groupId={} streamId={}", (Object)groupId, (Object)streamId);
        try {
            ClusterInfo clusterInfo = this.clusterService.getOne(groupInfo.getInlongClusterTag(), null, "PULSAR");
            this.deletePulsarTopic(groupInfo, (PulsarClusterInfo)clusterInfo, streamInfo.getMqResource());
            log.info("success to delete pulsar topic for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        }
        catch (Exception e) {
            String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
            log.error(msg, (Throwable)e);
            throw new WorkflowListenerException(msg);
        }
        log.info("success to delete pulsar resource for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
    }

    private void createPulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception {
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            String namespace = pulsarInfo.getMqResource();
            PulsarTopicBean topicBean = PulsarTopicBean.builder().tenant(tenant).namespace(namespace).topicName(topicName).queueModule(pulsarInfo.getQueueModule()).numPartitions(Integer.valueOf(pulsarInfo.getPartitionNum())).build();
            this.pulsarOperator.createTopic(pulsarAdmin, topicBean);
            boolean exist = this.pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName, "PARALLEL".equals(topicBean.getQueueModule()));
            if (!exist) {
                String topicFullName = tenant + "/" + namespace + "/" + topicName;
                String serviceUrl = pulsarCluster.getAdminUrl();
                log.error("topic={} not exists in {}", (Object)topicFullName, (Object)serviceUrl);
                throw new WorkflowListenerException("topic=" + topicFullName + " not exists in " + serviceUrl);
            }
            String subscription = groupInfo.getInlongClusterTag() + "_" + topicName + "_consumer_group";
            this.pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
            String groupId = groupInfo.getInlongGroupId();
            log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}", new Object[]{groupId, topicName, subscription});
            this.consumptionService.saveSortConsumption(groupInfo, topicName, subscription);
            log.info("success to save consume for groupId={}, topic={}, subs={}", new Object[]{groupId, topicName, subscription});
        }
    }

    private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception {
        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster);){
            String tenant = pulsarCluster.getTenant();
            if (StringUtils.isEmpty((CharSequence)tenant)) {
                tenant = "public";
            }
            String namespace = groupInfo.getMqResource();
            PulsarTopicBean topicBean = PulsarTopicBean.builder().tenant(tenant).namespace(namespace).topicName(topicName).build();
            this.pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
        }
    }
}

