package org.apache.inlong.manager.service.thirdpart.mq;

import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.class */
public class PulsarOptServiceImpl implements PulsarOptService {
    private static final Logger log = LoggerFactory.getLogger(PulsarOptServiceImpl.class);

    @Autowired
    private ConversionHandle conversionHandle;

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public void createTenant(PulsarAdmin pulsarAdmin, String str) throws PulsarAdminException {
        log.info("begin to create tenant={}", str);
        Preconditions.checkNotEmpty(str, "Tenant cannot be empty");
        try {
            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            if (tenantIsExists(pulsarAdmin, str)) {
                log.warn("pulsar tenant={} already exists, skip to create", str);
                return;
            }
            TenantInfoImpl tenantInfoImpl = new TenantInfoImpl();
            tenantInfoImpl.setAllowedClusters(Sets.newHashSet(pulsarClusters));
            tenantInfoImpl.setAdminRoles(Sets.newHashSet());
            pulsarAdmin.tenants().createTenant(str, tenantInfoImpl);
            log.info("success to create pulsar tenant={}", str);
        } catch (PulsarAdminException e) {
            log.error("create pulsar tenant={} failed", str, e);
            throw e;
        }
    }

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity businessPulsarEntity, String str, String str2) throws PulsarAdminException {
        Preconditions.checkNotNull(str, "pulsar tenant cannot be empty during create namespace");
        Preconditions.checkNotNull(str2, "pulsar namespace cannot be empty during create namespace");
        String str3 = str + "/" + str2;
        log.info("begin to create namespace={}", str3);
        try {
            if (namespacesIsExists(pulsarAdmin, str, str2)) {
                log.warn("namespace={} already exists, skip to create", str3);
                return;
            }
            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            Namespaces namespaces = pulsarAdmin.namespaces();
            namespaces.createNamespace(str3, Sets.newHashSet(pulsarClusters));
            Integer ttl = businessPulsarEntity.getTtl();
            if (ttl.intValue() > 0) {
                namespaces.setNamespaceMessageTTL(str3, this.conversionHandle.handleConversion(ttl, businessPulsarEntity.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            Integer retentionTime = businessPulsarEntity.getRetentionTime();
            if (retentionTime.intValue() > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, businessPulsarEntity.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            Integer retentionSize = businessPulsarEntity.getRetentionSize();
            if (retentionSize.intValue() > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, businessPulsarEntity.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            namespaces.setRetention(str3, new RetentionPolicies(retentionTime.intValue(), retentionSize.intValue()));
            namespaces.setPersistence(str3, new PersistencePolicies(businessPulsarEntity.getEnsemble().intValue(), businessPulsarEntity.getWriteQuorum().intValue(), businessPulsarEntity.getAckQuorum().intValue(), 0.0d));
            log.info("success to create namespace={}", str);
        } catch (PulsarAdminException e) {
            log.error("create namespace={} error", str, e);
            throw e;
        }
    }

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean pulsarTopicBean) throws PulsarAdminException {
        Preconditions.checkNotNull(pulsarTopicBean, "pulsar topic info cannot be empty");
        String tenant = pulsarTopicBean.getTenant();
        String namespace = pulsarTopicBean.getNamespace();
        String topicName = pulsarTopicBean.getTopicName();
        String str = tenant + "/" + namespace + "/" + topicName;
        if (topicIsExists(pulsarAdmin, tenant, namespace, topicName)) {
            log.warn("pulsar topic={} already exists in {}", str, pulsarAdmin.getServiceUrl());
            return;
        }
        try {
            if ("SERIAL".equalsIgnoreCase(pulsarTopicBean.getQueueModule())) {
                pulsarAdmin.topics().createNonPartitionedTopic(str);
            } else {
                Integer valueOf = Integer.valueOf(pulsarAdmin.brokers().getActiveBrokers(PulsarUtils.getPulsarClusters(pulsarAdmin).get(0)).size());
                if (pulsarTopicBean.getNumPartitions().intValue() > 0) {
                    valueOf = pulsarTopicBean.getNumPartitions();
                }
                pulsarAdmin.topics().createPartitionedTopic(str, valueOf.intValue());
            }
            log.info("success to create topic={}", str);
        } catch (Exception e) {
            log.error("create topic={} failed", str, e);
            throw e;
        }
    }

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean pulsarTopicBean, String str) throws PulsarAdminException {
        Preconditions.checkNotNull(pulsarTopicBean, "can not find tenant information to create subscription");
        Preconditions.checkNotNull(str, "subscription cannot be empty during creating subscription");
        String str2 = pulsarTopicBean.getTenant() + "/" + pulsarTopicBean.getNamespace() + "/" + pulsarTopicBean.getTopicName();
        log.info("begin to create pulsar subscription={} for topic={}", str, str2);
        try {
            if (subscriptionIsExists(pulsarAdmin, str2, str)) {
                log.warn("pulsar subscription={} already exists, skip to create", str);
            } else {
                pulsarAdmin.topics().createSubscription(str2, str, MessageId.latest);
                log.info("success to create subscription={}", str);
            }
        } catch (Exception e) {
            log.error("create pulsar subscription={} failed", str, e);
            throw e;
        }
    }

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public void createSubscriptions(PulsarAdmin pulsarAdmin, String str, PulsarTopicBean pulsarTopicBean, List<String> list) throws PulsarAdminException {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            pulsarTopicBean.setTopicName(it.next());
            createSubscription(pulsarAdmin, pulsarTopicBean, str);
        }
        log.info("success to create subscription={} for multiple topics={}", str, list);
    }

    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String str) throws PulsarAdminException {
        return pulsarAdmin.tenants().getTenants().contains(str);
    }

    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String str, String str2) throws PulsarAdminException {
        return pulsarAdmin.namespaces().getNamespaces(str).contains(str + "/" + str2);
    }

    @Override // org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService
    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String str, String str2, String str3) throws PulsarAdminException {
        if (StringUtils.isBlank(str3)) {
            return true;
        }
        for (String str4 : pulsarAdmin.topics().getPartitionedTopicList(str + "/" + str2)) {
            if (str3.equals(str4.substring(str4.lastIndexOf("/") + 1))) {
                return true;
            }
        }
        return false;
    }

    public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String str, String str2) {
        try {
            return pulsarAdmin.topics().getSubscriptions(str).contains(str2);
        } catch (PulsarAdminException e) {
            log.error("check if the topic={} is exists error,", str, e);
            return false;
        }
    }
}
