package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.base.Objects;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.class */
public class PulsarResourceOperator implements QueueResourceOperator {
    private static final Logger log = LoggerFactory.getLogger(PulsarResourceOperator.class);
    public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
    public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = "%s_%s_consumer_group_realtime_review";

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private StreamSinkService sinkService;

    @Autowired
    private InlongConsumeService consumeService;

    @Autowired
    private PulsarOperator pulsarOperator;

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public boolean accept(String str) {
        return "PULSAR".equals(str) || "TDMQ_PULSAR".equals(str);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "operator cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
        log.info("begin to create pulsar resource for groupId={}, clusterTag={}", inlongGroupId, inlongClusterTag);
        if (!(inlongGroupInfo instanceof InlongPulsarInfo)) {
            throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + inlongGroupId);
        }
        InlongPulsarInfo inlongPulsarInfo = (InlongPulsarInfo) inlongGroupInfo;
        String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
        Iterator<ClusterInfo> it = this.clusterService.listByTagAndType(inlongClusterTag, "PULSAR").iterator();
        while (it.hasNext()) {
            PulsarClusterInfo pulsarClusterInfo = (ClusterInfo) it.next();
            try {
                PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
                Throwable th = null;
                try {
                    try {
                        if (StringUtils.isBlank(pulsarTenant)) {
                            pulsarTenant = pulsarClusterInfo.getPulsarTenant();
                        }
                        if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), inlongGroupInfo.getStatus())) {
                            this.pulsarOperator.createTenant(pulsarAdmin, pulsarTenant);
                            String mqResource = inlongGroupInfo.getMqResource();
                            this.pulsarOperator.createNamespace(pulsarAdmin, inlongPulsarInfo, pulsarTenant, mqResource);
                            log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}", new Object[]{inlongGroupId, pulsarTenant, mqResource, pulsarClusterInfo});
                        }
                        if (pulsarAdmin != null) {
                            if (0 != 0) {
                                try {
                                    pulsarAdmin.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                pulsarAdmin.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                String str2 = "failed to create pulsar resource for groupId=" + inlongGroupId;
                log.error(str2 + ", cluster=" + pulsarClusterInfo, e);
                throw new WorkflowListenerException(str2 + ": " + e.getMessage());
            }
        }
        log.info("success to create pulsar resource for groupId={}, clusterTag={}", inlongGroupId, inlongClusterTag);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForGroup(InlongGroupInfo inlongGroupInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
        log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", inlongGroupId, inlongClusterTag);
        List<InlongStreamBriefInfo> topicList = this.streamService.getTopicList(inlongGroupId);
        if (CollectionUtils.isEmpty(topicList)) {
            log.warn("skip to delete pulsar resource as no streams for groupId={}", inlongGroupId);
            return;
        }
        Iterator<ClusterInfo> it = this.clusterService.listByTagAndType(inlongClusterTag, "PULSAR").iterator();
        while (it.hasNext()) {
            PulsarClusterInfo pulsarClusterInfo = (PulsarClusterInfo) it.next();
            try {
                Iterator<InlongStreamBriefInfo> it2 = topicList.iterator();
                while (it2.hasNext()) {
                    deletePulsarTopic((InlongPulsarInfo) inlongGroupInfo, pulsarClusterInfo, it2.next().getMqResource());
                }
            } catch (Exception e) {
                String str2 = "failed to delete pulsar resource for groupId=" + inlongGroupId;
                log.error(str2 + ", cluster=" + pulsarClusterInfo, e);
                throw new WorkflowListenerException(str2 + ": " + e.getMessage());
            }
        }
        log.info("success to delete pulsar resource for groupId={}, clusterTag={}", inlongGroupId, inlongClusterTag);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void createQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.expectNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "operator cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
        log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}", new Object[]{inlongGroupId, inlongStreamId, inlongClusterTag});
        Iterator<ClusterInfo> it = this.clusterService.listByTagAndType(inlongClusterTag, "PULSAR").iterator();
        while (it.hasNext()) {
            PulsarClusterInfo pulsarClusterInfo = (PulsarClusterInfo) it.next();
            try {
                String mqResource = inlongStreamInfo.getMqResource();
                createTopic((InlongPulsarInfo) inlongGroupInfo, pulsarClusterInfo, mqResource);
                createSubscription((InlongPulsarInfo) inlongGroupInfo, pulsarClusterInfo, mqResource, inlongStreamId);
                log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}", new Object[]{inlongGroupId, inlongStreamId, mqResource, pulsarClusterInfo});
            } catch (Exception e) {
                String str2 = "failed to create pulsar resource for groupId=" + inlongGroupId + ", streamId=" + inlongStreamId;
                log.error(str2 + ", cluster=" + pulsarClusterInfo, e);
                throw new WorkflowListenerException(str2 + ": " + e.getMessage());
            }
        }
        log.info("success to create pulsar resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public void deleteQueueForStream(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, String str) {
        Preconditions.expectNotNull(inlongGroupInfo, "inlong group info cannot be null");
        Preconditions.expectNotNull(inlongStreamInfo, "inlong stream info cannot be null");
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
        log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}", new Object[]{inlongGroupId, inlongStreamId, inlongClusterTag});
        Iterator<ClusterInfo> it = this.clusterService.listByTagAndType(inlongClusterTag, "PULSAR").iterator();
        while (it.hasNext()) {
            PulsarClusterInfo pulsarClusterInfo = (PulsarClusterInfo) it.next();
            try {
                deletePulsarTopic((InlongPulsarInfo) inlongGroupInfo, pulsarClusterInfo, inlongStreamInfo.getMqResource());
                log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}", new Object[]{inlongGroupId, inlongStreamId, inlongStreamInfo.getMqResource(), pulsarClusterInfo});
            } catch (Exception e) {
                String str2 = "failed to delete pulsar topic for groupId=" + inlongGroupId + ", streamId=" + inlongStreamId;
                log.error(str2 + ", cluster=" + pulsarClusterInfo, e);
                throw new WorkflowListenerException(str2 + ": " + e.getMessage());
            }
        }
        log.info("success to delete pulsar resource for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
    }

    private void createTopic(InlongPulsarInfo inlongPulsarInfo, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
        Throwable th = null;
        try {
            try {
                String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
                if (StringUtils.isBlank(pulsarTenant)) {
                    pulsarTenant = pulsarClusterInfo.getPulsarTenant();
                }
                this.pulsarOperator.createTopic(pulsarAdmin, PulsarTopicInfo.builder().pulsarTenant(pulsarTenant).namespace(inlongPulsarInfo.getMqResource()).topicName(str).queueModule(inlongPulsarInfo.getQueueModule()).numPartitions(Integer.valueOf(inlongPulsarInfo.getPartitionNum())).build());
                if (pulsarAdmin != null) {
                    if (0 == 0) {
                        pulsarAdmin.close();
                        return;
                    }
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (pulsarAdmin != null) {
                if (th != null) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th4;
        }
    }

    private void createSubscription(InlongPulsarInfo inlongPulsarInfo, PulsarClusterInfo pulsarClusterInfo, String str, String str2) throws Exception {
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
        Throwable th = null;
        try {
            String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
            if (StringUtils.isBlank(pulsarTenant)) {
                pulsarTenant = pulsarClusterInfo.getPulsarTenant();
            }
            String mqResource = inlongPulsarInfo.getMqResource();
            String str3 = pulsarTenant + "/" + mqResource + "/" + str;
            if (!this.pulsarOperator.topicExists(pulsarAdmin, pulsarTenant, mqResource, str, "PARALLEL".equals(inlongPulsarInfo.getQueueModule()))) {
                String adminUrl = pulsarClusterInfo.getAdminUrl();
                log.error("topic={} not exists in {}", str3, adminUrl);
                throw new WorkflowListenerException("topic=" + str3 + " not exists in " + adminUrl);
            }
            String inlongGroupId = inlongPulsarInfo.getInlongGroupId();
            List<StreamSink> listSink = this.sinkService.listSink(inlongGroupId, str2);
            if (CollectionUtils.isEmpty(listSink)) {
                log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", inlongGroupId, str2);
                if (pulsarAdmin != null) {
                    if (0 == 0) {
                        pulsarAdmin.close();
                        return;
                    }
                    try {
                        pulsarAdmin.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            String inlongClusterTag = inlongPulsarInfo.getInlongClusterTag();
            Iterator<StreamSink> it = listSink.iterator();
            while (it.hasNext()) {
                String format = String.format(PULSAR_SUBSCRIPTION, inlongClusterTag, str, it.next().getId());
                this.pulsarOperator.createSubscription(pulsarAdmin, str3, inlongPulsarInfo.getQueueModule(), format);
                log.info("success to create subs={} for groupId={}, topic={}", new Object[]{format, inlongGroupId, str3});
                log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", new Object[]{this.consumeService.saveBySystem(inlongPulsarInfo, str, format), format, inlongGroupId, str});
            }
            if (pulsarAdmin != null) {
                if (0 == 0) {
                    pulsarAdmin.close();
                    return;
                }
                try {
                    pulsarAdmin.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (pulsarAdmin != null) {
                if (0 != 0) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th4;
        }
    }

    private void deletePulsarTopic(InlongPulsarInfo inlongPulsarInfo, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
        Throwable th = null;
        try {
            try {
                String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
                if (StringUtils.isBlank(pulsarTenant)) {
                    pulsarTenant = pulsarClusterInfo.getPulsarTenant();
                }
                this.pulsarOperator.forceDeleteTopic(pulsarAdmin, PulsarTopicInfo.builder().pulsarTenant(pulsarTenant).namespace(inlongPulsarInfo.getMqResource()).topicName(str).build());
                if (pulsarAdmin != null) {
                    if (0 == 0) {
                        pulsarAdmin.close();
                        return;
                    }
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (pulsarAdmin != null) {
                if (th != null) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.inlong.manager.service.resource.queue.QueueResourceOperator
    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, Integer num) throws PulsarClientException {
        String inlongGroupId = inlongStreamInfo.getInlongGroupId();
        InlongPulsarInfo inlongPulsarInfo = (InlongPulsarInfo) inlongGroupInfo;
        PulsarClusterInfo one = this.clusterService.getOne(inlongGroupInfo.getInlongClusterTag(), null, "PULSAR");
        new ArrayList();
        PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(one);
        Throwable th = null;
        try {
            try {
                String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
                if (StringUtils.isBlank(pulsarTenant)) {
                    pulsarTenant = one.getPulsarTenant();
                }
                String mqResource = inlongGroupInfo.getMqResource();
                String mqResource2 = inlongStreamInfo.getMqResource();
                String str = pulsarTenant + "/" + mqResource + "/" + mqResource2;
                String format = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, inlongPulsarInfo.getInlongClusterTag(), mqResource2);
                List<BriefMQMessage> queryLatestMessage = this.pulsarOperator.queryLatestMessage(pulsarAdmin, str, format, num, inlongStreamInfo);
                log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", new Object[]{this.consumeService.saveBySystem(inlongGroupInfo, mqResource2, format), format, inlongGroupId, mqResource2});
                if (pulsarAdmin != null) {
                    if (0 != 0) {
                        try {
                            pulsarAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        pulsarAdmin.close();
                    }
                }
                return queryLatestMessage;
            } finally {
            }
        } catch (Throwable th3) {
            if (pulsarAdmin != null) {
                if (th != null) {
                    try {
                        pulsarAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    pulsarAdmin.close();
                }
            }
            throw th3;
        }
    }
}
