/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.thirdpart.mq;

import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
import org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarOptServiceImpl
implements PulsarOptService {
    private static final Logger log = LoggerFactory.getLogger(PulsarOptServiceImpl.class);
    @Autowired
    private ConversionHandle conversionHandle;

    @Override
    public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        log.info("begin to create tenant={}", (Object)tenant);
        Preconditions.checkNotEmpty((String)tenant, (String)"Tenant cannot be empty");
        try {
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
            if (exists) {
                log.warn("pulsar tenant={} already exists, skip to create", (Object)tenant);
                return;
            }
            TenantInfoImpl tenantInfo = new TenantInfoImpl();
            tenantInfo.setAllowedClusters((Set)Sets.newHashSet(clusters));
            tenantInfo.setAdminRoles((Set)Sets.newHashSet());
            pulsarAdmin.tenants().createTenant(tenant, (TenantInfo)tenantInfo);
            log.info("success to create pulsar tenant={}", (Object)tenant);
        }
        catch (PulsarAdminException e) {
            log.error("create pulsar tenant={} failed", (Object)tenant, (Object)e);
            throw e;
        }
    }

    @Override
    public void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity pulsarEntity, String tenant, String namespace) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)tenant, (String)"pulsar tenant cannot be empty during create namespace");
        Preconditions.checkNotNull((Object)namespace, (String)"pulsar namespace cannot be empty during create namespace");
        String namespaceName = tenant + "/" + namespace;
        log.info("begin to create namespace={}", (Object)namespaceName);
        try {
            Integer retentionSize;
            Integer retentionTime;
            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
            if (isExists) {
                log.warn("namespace={} already exists, skip to create", (Object)namespaceName);
                return;
            }
            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
            Namespaces namespaces = pulsarAdmin.namespaces();
            namespaces.createNamespace(namespaceName, (Set)Sets.newHashSet(clusters));
            Integer ttl = pulsarEntity.getTtl();
            if (ttl > 0) {
                namespaces.setNamespaceMessageTTL(namespaceName, this.conversionHandle.handleConversion(ttl, pulsarEntity.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            if ((retentionTime = pulsarEntity.getRetentionTime()) > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, pulsarEntity.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            if ((retentionSize = pulsarEntity.getRetentionSize()) > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, pulsarEntity.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime.intValue(), retentionSize.intValue());
            namespaces.setRetention(namespaceName, retentionPolicies);
            PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarEntity.getEnsemble().intValue(), pulsarEntity.getWriteQuorum().intValue(), pulsarEntity.getAckQuorum().intValue(), 0.0);
            namespaces.setPersistence(namespaceName, persistencePolicies);
            log.info("success to create namespace={}", (Object)tenant);
        }
        catch (PulsarAdminException e) {
            log.error("create namespace={} error", (Object)tenant, (Object)e);
            throw e;
        }
    }

    @Override
    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)topicBean, (String)"pulsar topic info cannot be empty");
        String tenant = topicBean.getTenant();
        String namespace = topicBean.getNamespace();
        String topic = topicBean.getTopicName();
        String topicFullName = tenant + "/" + namespace + "/" + topic;
        if (this.topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
            log.warn("pulsar topic={} already exists in {}", (Object)topicFullName, (Object)pulsarAdmin.getServiceUrl());
            return;
        }
        try {
            String queueModule = topicBean.getQueueModule();
            if ("SERIAL".equalsIgnoreCase(queueModule)) {
                pulsarAdmin.topics().createPartitionedTopic(topicFullName, 1);
            } else {
                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
                List brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
                Integer numPartitions = brokers.size();
                if (topicBean.getNumPartitions() > 0) {
                    numPartitions = topicBean.getNumPartitions();
                }
                pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions.intValue());
            }
            log.info("success to create topic={}", (Object)topicFullName);
        }
        catch (Exception e) {
            log.error("create topic={} failed", (Object)topicFullName, (Object)e);
            throw e;
        }
    }

    @Override
    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription) throws PulsarAdminException {
        Preconditions.checkNotNull((Object)topicBean, (String)"can not find tenant information to create subscription");
        Preconditions.checkNotNull((Object)subscription, (String)"subscription cannot be empty during creating subscription");
        String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
        log.info("begin to create pulsar subscription={} for topic={}", (Object)subscription, (Object)topicName);
        try {
            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
            if (!isExists) {
                pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
                log.info("success to create subscription={}", (Object)subscription);
            } else {
                log.warn("pulsar subscription={} already exists, skip to create", (Object)subscription);
            }
        }
        catch (Exception e) {
            log.error("create pulsar subscription={} failed", (Object)subscription, (Object)e);
            throw e;
        }
    }

    @Override
    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean, List<String> topicList) throws PulsarAdminException {
        for (String topic : topicList) {
            topicBean.setTopicName(topic);
            this.createSubscription(pulsarAdmin, topicBean, subscription);
        }
        log.info("success to create subscription={} for multiple topics={}", (Object)subscription, topicList);
    }

    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
        List tenantList = pulsarAdmin.tenants().getTenants();
        return tenantList.contains(tenant);
    }

    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace) throws PulsarAdminException {
        List namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
        return namespaceList.contains(tenant + "/" + namespace);
    }

    @Override
    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic) throws PulsarAdminException {
        if (StringUtils.isBlank((CharSequence)topic)) {
            return true;
        }
        List topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
        for (String t : topicList) {
            if (!topic.equals(t = t.substring(t.lastIndexOf("/") + 1))) continue;
            return true;
        }
        return false;
    }

    public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
        try {
            List subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
            return subscriptionList.contains(subscription);
        }
        catch (PulsarAdminException e) {
            log.error("check if the topic={} is exists error,", (Object)topic, (Object)e);
            return false;
        }
    }
}

