/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final int MAX_PARTITION = 100;
    private static final int RETRY_TIMES = 3;
    private static final int DELAY_SECONDS = 5;
    @Autowired
    private ConversionHandle conversionHandle;

    public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        LOGGER.info("begin to create pulsar tenant={}", (Object)tenant);
        Preconditions.checkNotEmpty((String)tenant, (String)"Tenant cannot be empty");
        try {
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
            if (exists) {
                LOGGER.warn("pulsar tenant={} already exists, skip to create", (Object)tenant);
                return;
            }
            TenantInfoImpl tenantInfo = new TenantInfoImpl();
            tenantInfo.setAllowedClusters((Set)Sets.newHashSet(clusters));
            tenantInfo.setAdminRoles((Set)Sets.newHashSet());
            pulsarAdmin.tenants().createTenant(tenant, (TenantInfo)tenantInfo);
            LOGGER.info("success to create pulsar tenant={}", (Object)tenant);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar tenant=" + tenant, (Throwable)e);
            throw e;
        }
    }

    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)tenant, (String)"pulsar tenant cannot be empty during create namespace");
        Preconditions.checkNotNull((Object)namespace, (String)"pulsar namespace cannot be empty during create namespace");
        String namespaceName = tenant + "/" + namespace;
        LOGGER.info("begin to create namespace={}", (Object)namespaceName);
        try {
            Integer retentionSize;
            Integer retentionTime;
            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
            if (isExists) {
                LOGGER.warn("namespace={} already exists, skip to create", (Object)namespaceName);
                return;
            }
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            Namespaces namespaces = pulsarAdmin.namespaces();
            namespaces.createNamespace(namespaceName, (Set)Sets.newHashSet(clusters));
            Integer ttl = pulsarInfo.getTtl();
            if (ttl > 0) {
                namespaces.setNamespaceMessageTTL(namespaceName, this.conversionHandle.handleConversion(ttl, pulsarInfo.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            if ((retentionTime = pulsarInfo.getRetentionTime()) > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, pulsarInfo.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            if ((retentionSize = pulsarInfo.getRetentionSize()) > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, pulsarInfo.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime.intValue(), retentionSize.intValue());
            namespaces.setRetention(namespaceName, retentionPolicies);
            PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble().intValue(), pulsarInfo.getWriteQuorum().intValue(), pulsarInfo.getAckQuorum().intValue(), 0.0);
            namespaces.setPersistence(namespaceName, persistencePolicies);
            LOGGER.info("success to create namespace={}", (Object)namespaceName);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create namespace=" + namespaceName, (Throwable)e);
            throw e;
        }
    }

    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)topicBean, (String)"pulsar topic info cannot be empty");
        String tenant = topicBean.getTenant();
        String namespace = topicBean.getNamespace();
        String topic = topicBean.getTopicName();
        String topicFullName = tenant + "/" + namespace + "/" + topic;
        if (this.topicIsExists(pulsarAdmin, tenant, namespace, topic, "PARALLEL".equals(topicBean.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already exists in {}", (Object)topicFullName, (Object)pulsarAdmin.getServiceUrl());
            return;
        }
        try {
            if ("SERIAL".equals(topicBean.getQueueModule())) {
                pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
                String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
                LOGGER.info("success to create topic={}, lookup result is {}", (Object)topicFullName, (Object)res);
            } else {
                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
                Integer numPartitions = topicBean.getNumPartitions();
                if (numPartitions < 0 || numPartitions <= 100) {
                    List brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
                    numPartitions = brokers.size();
                }
                pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions.intValue());
                Map res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
                if (res.keySet().size() != numPartitions.intValue()) {
                    for (int i = 0; i < 3 && res.keySet().size() != numPartitions.intValue(); ++i) {
                        res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
                        try {
                            Thread.sleep(500L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("Thread has been interrupted");
                        }
                    }
                }
                if (numPartitions.intValue() != res.keySet().size()) {
                    throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
                }
                LOGGER.info("success to create topic={}", (Object)topicFullName);
            }
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create topic=" + topicFullName, (Throwable)e);
            throw e;
        }
    }

    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)topicBean, (String)"pulsar topic info cannot be empty");
        String tenant = topicBean.getTenant();
        String namespace = topicBean.getNamespace();
        String topic = topicBean.getTopicName();
        String topicFullName = tenant + "/" + namespace + "/" + topic;
        if (this.topicIsExists(pulsarAdmin, tenant, namespace, topic, "PARALLEL".equals(topicBean.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already delete", (Object)topicFullName);
            return;
        }
        try {
            pulsarAdmin.topics().delete(topicFullName, true);
            LOGGER.info("success to delete topic={}", (Object)topicFullName);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to delete topic=" + topicFullName, (Throwable)e);
            throw e;
        }
    }

    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)topicBean, (String)"can not find tenant information to create subscription");
        Preconditions.checkNotNull((Object)subscription, (String)"subscription cannot be empty during creating subscription");
        String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
        LOGGER.info("begin to create pulsar subscription={} for topic={}", (Object)subscription, (Object)topicName);
        try {
            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription, "PARALLEL".equals(topicBean.getQueueModule()));
            if (isExists) {
                LOGGER.warn("pulsar subscription={} already exists, skip to create", (Object)subscription);
                return;
            }
            pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
            LOGGER.info("success to create subscription={}", (Object)subscription);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar subscription=" + subscription, (Throwable)e);
            throw e;
        }
    }

    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean, List<String> topicList) throws PulsarAdminException {
        for (String topic : topicList) {
            topicBean.setTopicName(topic);
            this.createSubscription(pulsarAdmin, topicBean, subscription);
        }
        LOGGER.info("success to create subscription={} for multiple topics={}", (Object)subscription, topicList);
    }

    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        List tenantList = pulsarAdmin.tenants().getTenants();
        return tenantList.contains(tenant);
    }

    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace) throws PulsarAdminException {
        List namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
        return namespaceList.contains(tenant + "/" + namespace);
    }

    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic, boolean isPartitioned) {
        if (StringUtils.isBlank((CharSequence)topic)) {
            return true;
        }
        boolean topicExists = false;
        try {
            List topicList = isPartitioned ? pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace) : pulsarAdmin.topics().getList(tenant + "/" + namespace);
            for (String t : topicList) {
                if (!topic.equals(t = t.substring(t.lastIndexOf("/") + 1))) continue;
                topicExists = true;
                break;
            }
        }
        catch (PulsarAdminException pe) {
            LOGGER.error("check if the pulsar topic={} exists error, begin retry", (Object)topic, (Object)pe);
            int count = 0;
            try {
                block5: while (!topicExists && ++count <= 3) {
                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", (Object)topic, (Object)count);
                    Thread.sleep(5L);
                    List topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
                    for (String t : topicList) {
                        if (!topic.equals(t = t.substring(t.lastIndexOf("/") + 1))) continue;
                        topicExists = true;
                        continue block5;
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("after retry, check if the pulsar topic={} exists still error", (Object)topic, (Object)pe);
            }
        }
        return topicExists;
    }

    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription, boolean isPartitioned) {
        int count = 0;
        while (++count <= 3) {
            try {
                LOGGER.info("check whether the subscription exists for topic={}, try count={}", (Object)topic, (Object)count);
                Thread.sleep(5L);
                if (isPartitioned) {
                    Map topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
                    if (topicMap.isEmpty()) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                } else {
                    String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
                    if (StringUtils.isBlank((CharSequence)lookupTopic)) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                }
                List subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
                return subscriptionList.contains(subscription);
            }
            catch (InterruptedException | PulsarAdminException e) {
                LOGGER.error("check if the subscription exists for topic={} error, continue retry", (Object)topic, (Object)e);
                if (count != 3) continue;
                LOGGER.error("after {} times retry, still check subscription exception for topic {}", (Object)count, (Object)topic);
                throw new BusinessException("check if the subscription exists error");
            }
        }
        return false;
    }
}

