package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.base.Objects;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.class */
public class PulsarResourceOperator implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(PulsarResourceOperator.class);

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private ConsumptionService consumptionService;

    @Autowired
    private PulsarOperator pulsarOperator;

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public boolean accept(String str) {
        return "PULSAR".equals(str) || "TDMQ_PULSAR".equals(str);
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.checkNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.checkNotNull(str, "operator cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        log.info("begin to create pulsar resource for groupId={}", inlongGroupId);
        PulsarClusterInfo pulsarClusterInfo = (PulsarClusterInfo) this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR");
        try {
            PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
            Throwable th = null;
            try {
                String tenant = pulsarClusterInfo.getTenant();
                if (StringUtils.isEmpty(tenant)) {
                    tenant = "public";
                }
                String mqResource = inlongGroupInfo.getMqResource();
                InlongPulsarInfo inlongPulsarInfo = (InlongPulsarInfo) inlongGroupInfo;
                if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), inlongGroupInfo.getStatus())) {
                    this.pulsarOperator.createTenant(pulsarAdmin, tenant);
                    log.info("success to create pulsar tenant for groupId={}, tenant={}", inlongGroupId, tenant);
                    this.pulsarOperator.createNamespace(pulsarAdmin, inlongPulsarInfo, tenant, mqResource);
                    log.info("success to create pulsar namespace for groupId={}, namespace={}", inlongGroupId, mqResource);
                }
                List<InlongStreamBriefInfo> topicList = this.streamService.getTopicList(inlongGroupId);
                if (topicList == null || topicList.isEmpty()) {
                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", inlongGroupId);
                    if (pulsarAdmin != null) {
                        if (0 == 0) {
                            pulsarAdmin.close();
                            return;
                        }
                        try {
                            pulsarAdmin.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                Iterator<InlongStreamBriefInfo> it = topicList.iterator();
                while (it.hasNext()) {
                    createPulsarTopic(inlongGroupInfo, pulsarClusterInfo, it.next().getMqResource());
                }
                if (pulsarAdmin != null) {
                    if (0 != 0) {
                        try {
                            pulsarAdmin.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        pulsarAdmin.close();
                    }
                }
                log.info("success to create pulsar resource for groupId={}, cluster={}", inlongGroupId, pulsarClusterInfo);
                return;
            } catch (Throwable th4) {
                if (pulsarAdmin != null) {
                    if (0 != 0) {
                        try {
                            pulsarAdmin.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        pulsarAdmin.close();
                    }
                }
                throw th4;
            }
        } catch (Exception e) {
            String format = String.format("failed to create pulsar resource for groupId=%s", inlongGroupId);
            log.error(format, e);
            throw new WorkflowListenerException(format + ": " + e.getMessage());
        }
        String format2 = String.format("failed to create pulsar resource for groupId=%s", inlongGroupId);
        log.error(format2, e);
        throw new WorkflowListenerException(format2 + ": " + e.getMessage());
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.checkNotNull(inlongGroupInfo, "inlong group info cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        log.info("begin to delete pulsar resource for groupId={}", inlongGroupId);
        ClusterInfo one = this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR");
        try {
            List<InlongStreamBriefInfo> topicList = this.streamService.getTopicList(inlongGroupId);
            if (topicList == null || topicList.isEmpty()) {
                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", inlongGroupId);
                return;
            }
            Iterator<InlongStreamBriefInfo> it = topicList.iterator();
            while (it.hasNext()) {
                deletePulsarTopic(inlongGroupInfo, (PulsarClusterInfo) one, it.next().getMqResource());
            }
            log.info("success to delete pulsar resource for groupId={}, cluster={}", inlongGroupId, one);
        } catch (Exception e) {
            log.error("failed to delete pulsar resource for groupId=" + inlongGroupId, e);
            throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.checkNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.checkNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        Preconditions.checkNotNull(str, "operator cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        log.info("begin to create pulsar resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        try {
            createPulsarTopic(inlongGroupInfo, (PulsarClusterInfo) this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR"), inlongStreamInfo.getMqResource());
            log.info("success to create pulsar resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        } catch (Exception e) {
            String format = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", inlongGroupId, inlongStreamId);
            log.error(format, e);
            throw new WorkflowListenerException(format + ": " + e.getMessage());
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.checkNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.checkNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        log.info("begin to delete pulsar resource for groupId={} streamId={}", inlongGroupId, inlongStreamId);
        try {
            deletePulsarTopic(inlongGroupInfo, (PulsarClusterInfo) this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR"), inlongStreamInfo.getMqResource());
            log.info("success to delete pulsar topic for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
            log.info("success to delete pulsar resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        } catch (Exception e) {
            String format = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", inlongGroupId, inlongStreamId);
            log.error(format, e);
            throw new WorkflowListenerException(format);
        }
    }

    private void createPulsarTopic(InlongGroupInfo inlongGroupInfo, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
        Throwable th = null;
        try {
            InlongPulsarInfo inlongPulsarInfo = (InlongPulsarInfo) inlongGroupInfo;
            String tenant = pulsarClusterInfo.getTenant();
            if (StringUtils.isEmpty(tenant)) {
                tenant = "public";
            }
            String mqResource = inlongPulsarInfo.getMqResource();
            PulsarTopicBean build = PulsarTopicBean.builder().tenant(tenant).namespace(mqResource).topicName(str).queueModule(inlongPulsarInfo.getQueueModule()).numPartitions(Integer.valueOf(inlongPulsarInfo.getPartitionNum())).build();
            this.pulsarOperator.createTopic(pulsarAdmin, build);
            if (!this.pulsarOperator.topicIsExists(pulsarAdmin, tenant, mqResource, str, "PARALLEL".equals(build.getQueueModule()))) {
                String str2 = tenant + "/" + mqResource + "/" + str;
                String adminUrl = pulsarClusterInfo.getAdminUrl();
                log.error("topic={} not exists in {}", str2, adminUrl);
                throw new WorkflowListenerException("topic=" + str2 + " not exists in " + adminUrl);
            }
            String str3 = inlongGroupInfo.getInlongClusterTag() + "_" + str + "_consumer_group";
            this.pulsarOperator.createSubscription(pulsarAdmin, build, str3);
            String inlongGroupId = inlongGroupInfo.getInlongGroupId();
            log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}", new Object[]{inlongGroupId, str, str3});
            this.consumptionService.saveSortConsumption(inlongGroupInfo, str, str3);
            log.info("success to save consume for groupId={}, topic={}, subs={}", new Object[]{inlongGroupId, str, str3});
            if (pulsarAdmin != null) {
                if (0 == 0) {
                    pulsarAdmin.close();
                    return;
                }
                try {
                    pulsarAdmin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (pulsarAdmin != null) {
                if (0 != 0) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th3;
        }
    }

    private void deletePulsarTopic(InlongGroupInfo inlongGroupInfo, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
        Throwable th = null;
        try {
            try {
                String tenant = pulsarClusterInfo.getTenant();
                if (StringUtils.isEmpty(tenant)) {
                    tenant = "public";
                }
                this.pulsarOperator.forceDeleteTopic(pulsarAdmin, PulsarTopicBean.builder().tenant(tenant).namespace(inlongGroupInfo.getMqResource()).topicName(str).build());
                if (pulsarAdmin != null) {
                    if (0 == 0) {
                        pulsarAdmin.close();
                        return;
                    }
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (pulsarAdmin != null) {
                if (th != null) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th4;
        }
    }
}
