package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.class */
public class PulsarOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final int MAX_PARTITION = 100;
    private static final int RETRY_TIMES = 3;
    private static final int DELAY_SECONDS = 5;

    @Autowired
    private ConversionHandle conversionHandle;

    public void createTenant(PulsarAdmin pulsarAdmin, String str) throws PulsarAdminException {
        LOGGER.info("begin to create pulsar tenant={}", str);
        Preconditions.checkNotEmpty(str, "Tenant cannot be empty");
        try {
            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            if (tenantIsExists(pulsarAdmin, str)) {
                LOGGER.warn("pulsar tenant={} already exists, skip to create", str);
                return;
            }
            TenantInfoImpl tenantInfoImpl = new TenantInfoImpl();
            tenantInfoImpl.setAllowedClusters(Sets.newHashSet(pulsarClusters));
            tenantInfoImpl.setAdminRoles(Sets.newHashSet());
            pulsarAdmin.tenants().createTenant(str, tenantInfoImpl);
            LOGGER.info("success to create pulsar tenant={}", str);
        } catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar tenant=" + str, e);
            throw e;
        }
    }

    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo inlongPulsarInfo, String str, String str2) throws PulsarAdminException {
        Preconditions.checkNotNull(str, "pulsar tenant cannot be empty during create namespace");
        Preconditions.checkNotNull(str2, "pulsar namespace cannot be empty during create namespace");
        String str3 = str + "/" + str2;
        LOGGER.info("begin to create namespace={}", str3);
        try {
            if (namespacesIsExists(pulsarAdmin, str, str2)) {
                LOGGER.warn("namespace={} already exists, skip to create", str3);
                return;
            }
            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            Namespaces namespaces = pulsarAdmin.namespaces();
            namespaces.createNamespace(str3, Sets.newHashSet(pulsarClusters));
            Integer ttl = inlongPulsarInfo.getTtl();
            if (ttl.intValue() > 0) {
                namespaces.setNamespaceMessageTTL(str3, this.conversionHandle.handleConversion(ttl, inlongPulsarInfo.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            Integer retentionTime = inlongPulsarInfo.getRetentionTime();
            if (retentionTime.intValue() > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, inlongPulsarInfo.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            Integer retentionSize = inlongPulsarInfo.getRetentionSize();
            if (retentionSize.intValue() > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, inlongPulsarInfo.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            namespaces.setRetention(str3, new RetentionPolicies(retentionTime.intValue(), retentionSize.intValue()));
            namespaces.setPersistence(str3, new PersistencePolicies(inlongPulsarInfo.getEnsemble().intValue(), inlongPulsarInfo.getWriteQuorum().intValue(), inlongPulsarInfo.getAckQuorum().intValue(), 0.0d));
            LOGGER.info("success to create namespace={}", str3);
        } catch (PulsarAdminException e) {
            LOGGER.error("failed to create namespace=" + str3, e);
            throw e;
        }
    }

    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean pulsarTopicBean) throws PulsarAdminException {
        Preconditions.checkNotNull(pulsarTopicBean, "pulsar topic info cannot be empty");
        String tenant = pulsarTopicBean.getTenant();
        String namespace = pulsarTopicBean.getNamespace();
        String topicName = pulsarTopicBean.getTopicName();
        String str = tenant + "/" + namespace + "/" + topicName;
        if (topicIsExists(pulsarAdmin, tenant, namespace, topicName, "PARALLEL".equals(pulsarTopicBean.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already exists in {}", str, pulsarAdmin.getServiceUrl());
            return;
        }
        try {
            if ("SERIAL".equals(pulsarTopicBean.getQueueModule())) {
                pulsarAdmin.topics().createNonPartitionedTopic(str);
                LOGGER.info("success to create topic={}, lookup result is {}", str, pulsarAdmin.lookups().lookupTopic(str));
            } else {
                List<String> pulsarClusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
                Integer numPartitions = pulsarTopicBean.getNumPartitions();
                if (numPartitions.intValue() < 0 || numPartitions.intValue() <= MAX_PARTITION) {
                    numPartitions = Integer.valueOf(pulsarAdmin.brokers().getActiveBrokers(pulsarClusters.get(0)).size());
                }
                pulsarAdmin.topics().createPartitionedTopic(str, numPartitions.intValue());
                Map lookupPartitionedTopic = pulsarAdmin.lookups().lookupPartitionedTopic(str);
                if (lookupPartitionedTopic.keySet().size() != numPartitions.intValue()) {
                    for (int i = 0; i < RETRY_TIMES && lookupPartitionedTopic.keySet().size() != numPartitions.intValue(); i++) {
                        lookupPartitionedTopic = pulsarAdmin.lookups().lookupPartitionedTopic(str);
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e) {
                            LOGGER.error("Thread has been interrupted");
                        }
                    }
                }
                if (numPartitions.intValue() != lookupPartitionedTopic.keySet().size()) {
                    throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
                }
                LOGGER.info("success to create topic={}", str);
            }
        } catch (PulsarAdminException e2) {
            LOGGER.error("failed to create topic=" + str, e2);
            throw e2;
        }
    }

    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean pulsarTopicBean) throws PulsarAdminException {
        Preconditions.checkNotNull(pulsarTopicBean, "pulsar topic info cannot be empty");
        String tenant = pulsarTopicBean.getTenant();
        String namespace = pulsarTopicBean.getNamespace();
        String topicName = pulsarTopicBean.getTopicName();
        String str = tenant + "/" + namespace + "/" + topicName;
        if (topicIsExists(pulsarAdmin, tenant, namespace, topicName, "PARALLEL".equals(pulsarTopicBean.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already delete", str);
            return;
        }
        try {
            pulsarAdmin.topics().delete(str, true);
            LOGGER.info("success to delete topic={}", str);
        } catch (PulsarAdminException e) {
            LOGGER.error("failed to delete topic=" + str, e);
            throw e;
        }
    }

    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean pulsarTopicBean, String str) throws PulsarAdminException {
        Preconditions.checkNotNull(pulsarTopicBean, "can not find tenant information to create subscription");
        Preconditions.checkNotNull(str, "subscription cannot be empty during creating subscription");
        String str2 = pulsarTopicBean.getTenant() + "/" + pulsarTopicBean.getNamespace() + "/" + pulsarTopicBean.getTopicName();
        LOGGER.info("begin to create pulsar subscription={} for topic={}", str, str2);
        try {
            if (subscriptionIsExists(pulsarAdmin, str2, str, "PARALLEL".equals(pulsarTopicBean.getQueueModule()))) {
                LOGGER.warn("pulsar subscription={} already exists, skip to create", str);
            } else {
                pulsarAdmin.topics().createSubscription(str2, str, MessageId.latest);
                LOGGER.info("success to create subscription={}", str);
            }
        } catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar subscription=" + str, e);
            throw e;
        }
    }

    public void createSubscriptions(PulsarAdmin pulsarAdmin, String str, PulsarTopicBean pulsarTopicBean, List<String> list) throws PulsarAdminException {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            pulsarTopicBean.setTopicName(it.next());
            createSubscription(pulsarAdmin, pulsarTopicBean, str);
        }
        LOGGER.info("success to create subscription={} for multiple topics={}", str, list);
    }

    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String str) throws PulsarAdminException {
        return pulsarAdmin.tenants().getTenants().contains(str);
    }

    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String str, String str2) throws PulsarAdminException {
        return pulsarAdmin.namespaces().getNamespaces(str).contains(str + "/" + str2);
    }

    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String str, String str2, String str3, boolean z) {
        if (StringUtils.isBlank(str3)) {
            return true;
        }
        boolean z2 = false;
        try {
            Iterator it = (z ? pulsarAdmin.topics().getPartitionedTopicList(str + "/" + str2) : pulsarAdmin.topics().getList(str + "/" + str2)).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                String str4 = (String) it.next();
                if (str3.equals(str4.substring(str4.lastIndexOf("/") + 1))) {
                    z2 = true;
                    break;
                }
            }
        } catch (PulsarAdminException e) {
            LOGGER.error("check if the pulsar topic={} exists error, begin retry", str3, e);
            int i = 0;
            while (!z2) {
                try {
                    i++;
                    if (i > RETRY_TIMES) {
                        break;
                    }
                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", str3, Integer.valueOf(i));
                    Thread.sleep(5L);
                    Iterator it2 = pulsarAdmin.topics().getPartitionedTopicList(str + "/" + str2).iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            String str5 = (String) it2.next();
                            if (str3.equals(str5.substring(str5.lastIndexOf("/") + 1))) {
                                z2 = true;
                                break;
                            }
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.error("after retry, check if the pulsar topic={} exists still error", str3, e);
                }
            }
        }
        return z2;
    }

    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String str, String str2, boolean z) {
        int i = 0;
        while (true) {
            i++;
            if (i > RETRY_TIMES) {
                return false;
            }
            try {
                LOGGER.info("check whether the subscription exists for topic={}, try count={}", str, Integer.valueOf(i));
                Thread.sleep(5L);
            } catch (PulsarAdminException | InterruptedException e) {
                LOGGER.error("check if the subscription exists for topic={} error, continue retry", str, e);
                if (i == RETRY_TIMES) {
                    LOGGER.error("after {} times retry, still check subscription exception for topic {}", Integer.valueOf(i), str);
                    throw new BusinessException("check if the subscription exists error");
                }
            }
            if (z) {
                if (pulsarAdmin.lookups().lookupPartitionedTopic(str).isEmpty()) {
                    LOGGER.error("result of lookups topic={} is empty, continue retry", str);
                }
            } else if (StringUtils.isBlank(pulsarAdmin.lookups().lookupTopic(str))) {
                LOGGER.error("result of lookups topic={} is empty, continue retry", str);
            }
            return pulsarAdmin.topics().getSubscriptions(str).contains(str2);
        }
    }
}
