/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final int MAX_PARTITION = 1000;
    private static final int RETRY_TIMES = 3;
    private static final int DELAY_SECONDS = 5;
    @Autowired
    private ConversionHandle conversionHandle;

    public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        LOGGER.info("begin to create pulsar tenant={}", (Object)tenant);
        Preconditions.expectNotBlank((String)tenant, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"Tenant cannot be empty");
        try {
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
            if (exists) {
                LOGGER.warn("pulsar tenant={} already exists, skip to create", (Object)tenant);
                return;
            }
            TenantInfoImpl tenantInfo = new TenantInfoImpl();
            tenantInfo.setAllowedClusters((Set)Sets.newHashSet(clusters));
            tenantInfo.setAdminRoles((Set)Sets.newHashSet());
            pulsarAdmin.tenants().createTenant(tenant, (TenantInfo)tenantInfo);
            LOGGER.info("success to create pulsar tenant={}", (Object)tenant);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar tenant=" + tenant, (Throwable)e);
            throw e;
        }
    }

    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace) throws PulsarAdminException {
        Preconditions.expectNotBlank((String)tenant, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"pulsar tenant cannot be empty during create namespace");
        Preconditions.expectNotBlank((String)namespace, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"pulsar namespace cannot be empty during create namespace");
        String namespaceName = tenant + "/" + namespace;
        LOGGER.info("begin to create namespace={}", (Object)namespaceName);
        try {
            Integer retentionSize;
            Integer retentionTime;
            boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace);
            if (isExists) {
                LOGGER.warn("namespace={} already exists, skip to create", (Object)namespaceName);
                return;
            }
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            Namespaces namespaces = pulsarAdmin.namespaces();
            namespaces.createNamespace(namespaceName, (Set)Sets.newHashSet(clusters));
            Integer ttl = pulsarInfo.getTtl();
            if (ttl > 0) {
                namespaces.setNamespaceMessageTTL(namespaceName, this.conversionHandle.handleConversion(ttl, pulsarInfo.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            if ((retentionTime = pulsarInfo.getRetentionTime()) > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, pulsarInfo.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            if ((retentionSize = pulsarInfo.getRetentionSize()) > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, pulsarInfo.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime.intValue(), retentionSize.intValue());
            namespaces.setRetention(namespaceName, retentionPolicies);
            PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble().intValue(), pulsarInfo.getWriteQuorum().intValue(), pulsarInfo.getAckQuorum().intValue(), 0.0);
            namespaces.setPersistence(namespaceName, persistencePolicies);
            LOGGER.info("success to create namespace={}", (Object)namespaceName);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create namespace=" + namespaceName, (Throwable)e);
            throw e;
        }
    }

    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
        Preconditions.expectNotNull((Object)topicInfo, (String)"pulsar topic info cannot be empty");
        String tenant = topicInfo.getTenant();
        String namespace = topicInfo.getNamespace();
        String topicName = topicInfo.getTopicName();
        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
        if (this.topicExists(pulsarAdmin, tenant, namespace, topicName, "PARALLEL".equals(topicInfo.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already exists in {}", (Object)fullTopicName, (Object)pulsarAdmin.getServiceUrl());
            return;
        }
        try {
            if ("SERIAL".equals(topicInfo.getQueueModule())) {
                pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName);
                String res = pulsarAdmin.lookups().lookupTopic(fullTopicName);
                LOGGER.info("success to create topic={}, lookup result is {}", (Object)fullTopicName, (Object)res);
            } else {
                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
                Integer numPartitions = topicInfo.getNumPartitions();
                if (numPartitions < 0 || numPartitions >= 1000) {
                    List brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
                    numPartitions = brokers.size();
                }
                pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions.intValue());
                Map res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
                if (res.keySet().size() != numPartitions.intValue()) {
                    for (int i = 0; i < 3 && res.keySet().size() != numPartitions.intValue(); ++i) {
                        res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
                        try {
                            Thread.sleep(500L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("Thread has been interrupted");
                        }
                    }
                }
                if (numPartitions.intValue() != res.keySet().size()) {
                    throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
                }
                LOGGER.info("success to create topic={}", (Object)fullTopicName);
            }
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create topic=" + fullTopicName, (Throwable)e);
            throw e;
        }
    }

    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
        Preconditions.expectNotNull((Object)topicInfo, (String)"pulsar topic info cannot be empty");
        String tenant = topicInfo.getTenant();
        String namespace = topicInfo.getNamespace();
        String topic = topicInfo.getTopicName();
        String fullTopicName = tenant + "/" + namespace + "/" + topic;
        if (this.topicExists(pulsarAdmin, tenant, namespace, topic, "PARALLEL".equals(topicInfo.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already delete", (Object)fullTopicName);
            return;
        }
        try {
            pulsarAdmin.topics().delete(fullTopicName, true);
            LOGGER.info("success to delete topic={}", (Object)fullTopicName);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to delete topic=" + fullTopicName, (Throwable)e);
            throw e;
        }
    }

    public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule, String subscription) throws PulsarAdminException {
        LOGGER.info("begin to create pulsar subscription={} for topic={}", (Object)subscription, (Object)fullTopicName);
        try {
            boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription, "PARALLEL".equals(queueModule));
            if (isExists) {
                LOGGER.warn("pulsar subscription={} already exists, skip to create", (Object)subscription);
                return;
            }
            pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest);
            LOGGER.info("success to create subscription={}", (Object)subscription);
        }
        catch (PulsarAdminException e) {
            LOGGER.error("failed to create pulsar subscription=" + subscription, (Throwable)e);
            throw e;
        }
    }

    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo, List<String> topicList) throws PulsarAdminException {
        for (String topic : topicList) {
            topicInfo.setTopicName(topic);
            String fullTopicName = topicInfo.getTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
            this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription);
        }
        LOGGER.info("success to create subscription={} for multiple topics={}", (Object)subscription, topicList);
    }

    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        List tenantList = pulsarAdmin.tenants().getTenants();
        return tenantList.contains(tenant);
    }

    private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace) throws PulsarAdminException {
        List namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
        return namespaceList.contains(tenant + "/" + namespace);
    }

    public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName, boolean isPartitioned) {
        if (StringUtils.isBlank((CharSequence)topicName)) {
            return true;
        }
        boolean topicExists = false;
        try {
            List topicList = isPartitioned ? pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace) : pulsarAdmin.topics().getList(tenant + "/" + namespace);
            for (String t : topicList) {
                int suffixIndex;
                t = t.substring(t.lastIndexOf("/") + 1);
                if (!isPartitioned && (suffixIndex = t.lastIndexOf("-partition-")) > 0) {
                    t = t.substring(0, suffixIndex);
                }
                if (!topicName.equals(t)) continue;
                topicExists = true;
                break;
            }
        }
        catch (PulsarAdminException pe) {
            LOGGER.error("check if the pulsar topic={} exists error, begin retry", (Object)topicName, (Object)pe);
            int count = 0;
            try {
                block5: while (!topicExists && ++count <= 3) {
                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", (Object)topicName, (Object)count);
                    Thread.sleep(5L);
                    List topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
                    for (String t : topicList) {
                        if (!topicName.equals(t = t.substring(t.lastIndexOf("/") + 1))) continue;
                        topicExists = true;
                        continue block5;
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("after retry, check if the pulsar topic={} exists still error", (Object)topicName, (Object)pe);
            }
        }
        return topicExists;
    }

    private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription, boolean isPartitioned) {
        int count = 0;
        while (++count <= 3) {
            try {
                LOGGER.info("check whether the subscription exists for topic={}, try count={}", (Object)topic, (Object)count);
                Thread.sleep(5L);
                if (isPartitioned) {
                    Map topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
                    if (topicMap.isEmpty()) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                } else {
                    String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
                    if (StringUtils.isBlank((CharSequence)lookupTopic)) {
                        LOGGER.error("result of lookups topic={} is empty, continue retry", (Object)topic);
                        continue;
                    }
                }
                List subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
                return subscriptionList.contains(subscription);
            }
            catch (InterruptedException | PulsarAdminException e) {
                LOGGER.error("check if the subscription exists for topic={} error, continue retry", (Object)topic, (Object)e);
                if (count != 3) continue;
                LOGGER.error("after {} times retry, still check subscription exception for topic {}", (Object)count, (Object)topic);
                throw new BusinessException("check if the subscription exists error: " + e.getMessage());
            }
        }
        return false;
    }
}

