/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitZkUtils;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceService {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
    private final ServiceConfiguration config;
    private final AtomicReference<LoadManager> loadManager;
    private final PulsarService pulsar;
    private final OwnershipCache ownershipCache;
    private final NamespaceBundleFactory bundleFactory;
    private int uncountedNamespaces;
    private final String host;
    public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
    public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile("sla-monitor/[^/]+/([^:]+:\\d+)");
    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
    public static final String SLA_NAMESPACE_FMT = "sla-monitor/%s/%s:%s";

    public NamespaceService(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.host = pulsar.getAdvertisedAddress();
        this.config = pulsar.getConfiguration();
        this.loadManager = pulsar.getLoadManager();
        ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
        this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
        this.ownershipCache = new OwnershipCache(pulsar, this.bundleFactory);
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(DestinationName topic, boolean authoritative) {
        return this.getBundleAsync(topic).thenCompose(bundle -> this.findBrokerServiceUrl((NamespaceBundle)bundle, authoritative, false));
    }

    public CompletableFuture<NamespaceBundle> getBundleAsync(DestinationName topic) {
        return this.bundleFactory.getBundlesAsync(topic.getNamespaceObject()).thenApply(bundles -> bundles.findBundle(topic));
    }

    public NamespaceBundle getBundle(DestinationName destination) throws Exception {
        return this.bundleFactory.getBundles(destination.getNamespaceObject()).findBundle(destination);
    }

    public int getBundleCount(NamespaceName namespace) throws Exception {
        return this.bundleFactory.getBundles(namespace).size();
    }

    private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return this.bundleFactory.getFullBundle(fqnn);
    }

    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly) throws Exception {
        if (suName instanceof DestinationName) {
            DestinationName name = (DestinationName)suName;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting web service URL of destination: {} - auth: {}", (Object)name, (Object)authoritative);
            }
            return this.internalGetWebServiceUrl(this.getBundle(name), authoritative, isRequestHttps, readOnly).get();
        }
        if (suName instanceof NamespaceName) {
            return this.internalGetWebServiceUrl(this.getFullBundle((NamespaceName)suName), authoritative, isRequestHttps, readOnly).get();
        }
        if (suName instanceof NamespaceBundle) {
            return this.internalGetWebServiceUrl((NamespaceBundle)suName, authoritative, isRequestHttps, readOnly).get();
        }
        throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
    }

    private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative, boolean isRequestHttps, boolean readOnly) {
        return this.findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
            if (lookupResult.isPresent()) {
                try {
                    LookupData lookupData = ((LookupResult)lookupResult.get()).getLookupData();
                    String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
                    return Optional.of(new URL(redirectUrl));
                }
                catch (Exception e) {
                    LOG.warn("internalGetWebServiceUrl [{}]", (Object)e.getMessage(), (Object)e);
                }
            }
            return Optional.empty();
        });
    }

    public void registerBootstrapNamespaces() throws PulsarServerException {
        if (this.registerNamespace(NamespaceService.getHeartbeatNamespace(this.host, this.config), true)) {
            ++this.uncountedNamespaces;
            LOG.info("added heartbeat namespace name in local cache: ns={}", (Object)NamespaceService.getHeartbeatNamespace(this.host, this.config));
        }
        for (String namespace : this.config.getBootstrapNamespaces()) {
            if (!this.registerNamespace(namespace, false)) continue;
            LOG.info("added bootstrap namespace name in local cache: ns={}", (Object)namespace);
        }
    }

    private boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {
        String msg;
        block6: {
            String otherUrl;
            String myUrl;
            block5: {
                myUrl = this.pulsar.getBrokerServiceUrl();
                try {
                    NamespaceName nsname = new NamespaceName(namespace);
                    otherUrl = null;
                    NamespaceBundle nsFullBundle = null;
                    nsFullBundle = this.bundleFactory.getFullBundle(nsname);
                    otherUrl = this.ownershipCache.tryAcquiringOwnership(nsFullBundle).get().getNativeUrl();
                    if (!myUrl.equals(otherUrl)) break block5;
                    if (nsFullBundle != null) {
                        this.pulsar.loadNamespaceDestinations(nsFullBundle);
                    }
                    return true;
                }
                catch (Exception e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                    throw new PulsarServerException((Throwable)e);
                }
            }
            msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s", namespace, myUrl, otherUrl);
            if (ensureOwned) break block6;
            LOG.info(msg);
            return false;
        }
        throw new IllegalStateException(msg);
    }

    private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative, boolean readOnly) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("findBrokerServiceUrl: {} - read-only: {}", (Object)bundle, (Object)readOnly);
        }
        CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<Optional<LookupResult>>();
        ((CompletableFuture)this.ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
            if (!nsData.isPresent()) {
                if (readOnly) {
                    future.complete(Optional.empty());
                } else {
                    this.pulsar.getExecutor().execute(() -> this.searchForCandidateBroker(bundle, future, authoritative));
                }
            } else if (((NamespaceEphemeralData)nsData.get()).isDisabled()) {
                future.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Namespace bundle {} already owned by {} ", (Object)bundle, nsData);
                }
                future.complete(Optional.of(new LookupResult((NamespaceEphemeralData)nsData.get())));
            }
        })).exceptionally(exception -> {
            LOG.warn("Failed to check owner for bundle {}: {}", new Object[]{bundle, exception.getMessage(), exception});
            future.completeExceptionally((Throwable)exception);
            return null;
        });
        return future;
    }

    private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
        String candidateBroker = null;
        try {
            String broker;
            candidateBroker = NamespaceService.checkHeartbeatNamespace(bundle);
            if (candidateBroker == null && (broker = NamespaceService.getSLAMonitorBrokerName(bundle)) != null && this.isBrokerActive(broker)) {
                candidateBroker = broker;
            }
            if (candidateBroker == null) {
                if (!this.loadManager.get().isCentralized() || this.pulsar.getLeaderElectionService().isLeader()) {
                    Optional<String> availableBroker = this.getLeastLoadedFromLoadManager(bundle);
                    if (!availableBroker.isPresent()) {
                        lookupFuture.complete(Optional.empty());
                        return;
                    }
                    candidateBroker = availableBroker.get();
                } else {
                    candidateBroker = authoritative ? this.pulsar.getWebServiceAddress() : this.pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Error when searching for candidate broker to acquire {}: {}", new Object[]{bundle, e.getMessage(), e});
            lookupFuture.completeExceptionally(e);
            return;
        }
        try {
            Preconditions.checkNotNull((Object)candidateBroker);
            if (this.pulsar.getWebServiceAddress().equals(candidateBroker)) {
                ((CompletableFuture)this.ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
                    if (ownerInfo.isDisabled()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Namespace bundle {} is currently being unloaded", (Object)bundle);
                        }
                        lookupFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded", bundle)));
                    } else {
                        this.pulsar.loadNamespaceDestinations(bundle);
                        lookupFuture.complete(Optional.of(new LookupResult((NamespaceEphemeralData)ownerInfo)));
                    }
                })).exceptionally(exception -> {
                    LOG.warn("Failed to acquire ownership for namespace bundle {}: ", new Object[]{bundle, exception.getMessage(), exception});
                    lookupFuture.completeExceptionally(new PulsarServerException("Failed to acquire ownership for namespace bundle " + bundle, exception));
                    return null;
                });
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", (Object)candidateBroker, (Object)bundle);
                }
                ((CompletableFuture)this.createLookupResult(candidateBroker).thenAccept(lookupResult -> {
                    boolean bl = lookupFuture.complete(Optional.of(lookupResult));
                })).exceptionally(ex -> {
                    lookupFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        }
        catch (Exception e) {
            LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", new Object[]{bundle, e.getMessage(), e});
            lookupFuture.completeExceptionally(e);
        }
    }

    protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
        CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<LookupResult>();
        try {
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)candidateBroker), (Object)("Lookup broker can't be null " + candidateBroker));
            URI uri = new URI(candidateBroker);
            String path = String.format("%s/%s:%s", "/loadbalance/brokers", uri.getHost(), uri.getPort());
            ((CompletableFuture)this.pulsar.getLocalZkCache().getDataAsync(path, this.pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
                if (reportData.isPresent()) {
                    ServiceLookupData lookupData = (ServiceLookupData)reportData.get();
                    lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(), lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(), lookupData.getPulsarServiceUrlTls()));
                } else {
                    lookupFuture.completeExceptionally(new KeeperException.NoNodeException(path));
                }
            })).exceptionally(ex -> {
                lookupFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception e) {
            lookupFuture.completeExceptionally(e);
        }
        return lookupFuture;
    }

    private boolean isBrokerActive(String candidateBroker) throws KeeperException, InterruptedException {
        Set activeNativeBrokers = this.pulsar.getLocalZkCache().getChildren("/loadbalance/brokers");
        for (String brokerHostPort : activeNativeBrokers) {
            if (!candidateBroker.equals("http://" + brokerHostPort)) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Broker {} found for SLA Monitoring Namespace", (Object)brokerHostPort);
            }
            return true;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Broker not found for SLA Monitoring Namespace {}", (Object)(String.valueOf(candidateBroker) + ":" + this.config.getWebServicePort()));
        }
        return false;
    }

    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
        ResourceUnit leastLoadedBroker = this.loadManager.get().getLeastLoaded(serviceUnit);
        if (leastLoadedBroker != null) {
            String lookupAddress = leastLoadedBroker.getResourceId();
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", (Object)this.pulsar.getWebServiceAddress(), (Object)lookupAddress);
            }
            return Optional.of(lookupAddress);
        }
        LOG.warn("No broker is available for {}", (Object)serviceUnit);
        return Optional.empty();
    }

    public void unloadNamespace(NamespaceName ns) throws Exception {
        NamespaceBundle nsFullBundle = this.getFullBundle(ns);
        this.unloadNamespaceBundle(nsFullBundle);
    }

    public void unloadNamespaceBundle(NamespaceBundle bundle) throws Exception {
        ((OwnedBundle)Preconditions.checkNotNull((Object)this.ownershipCache.getOwnedBundle(bundle))).handleUnloadRequest(this.pulsar);
    }

    public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus() throws Exception {
        NamespaceIsolationPolicies nsIsolationPolicies = this.getLocalNamespaceIsolationPolicies();
        HashMap<String, NamespaceOwnershipStatus> ownedNsStatus = new HashMap<String, NamespaceOwnershipStatus>();
        for (OwnedBundle nsObj : this.ownershipCache.getOwnedBundles().values()) {
            NamespaceOwnershipStatus nsStatus = this.getNamespaceOwnershipStatus(nsObj, nsIsolationPolicies.getPolicyByNamespace(nsObj.getNamespaceBundle().getNamespaceObject()));
            ownedNsStatus.put(nsObj.getNamespaceBundle().toString(), nsStatus);
        }
        return ownedNsStatus;
    }

    private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, NamespaceIsolationPolicy nsIsolationPolicy) {
        NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, nsObj.isActive());
        if (nsIsolationPolicy == null) {
            return nsOwnedStatus;
        }
        nsOwnedStatus.is_controlled = true;
        if (nsIsolationPolicy.isPrimaryBroker(this.pulsar.getAdvertisedAddress())) {
            nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
        } else if (nsIsolationPolicy.isSecondaryBroker(this.pulsar.getAdvertisedAddress())) {
            nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
        }
        return nsOwnedStatus;
    }

    private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
        String localCluster = this.pulsar.getConfiguration().getClusterName();
        return this.pulsar.getConfigurationCache().namespaceIsolationPoliciesCache().get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> new NamespaceIsolationPolicies());
    }

    public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
        try {
            CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = this.ownershipCache.getOwnerAsync(bundle);
            if (nsDataFuture != null) {
                Optional nsData = nsDataFuture.getNow(null);
                if (nsData != null && nsData.isPresent()) {
                    return ((NamespaceEphemeralData)nsData.get()).isDisabled();
                }
                return false;
            }
            return false;
        }
        catch (Exception e) {
            LOG.warn("Exception in getting ownership info for service unit {}: {}", new Object[]{bundle, e.getMessage(), e});
            return false;
        }
    }

    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle) throws Exception {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = this.bundleFactory.splitBundles(bundle, 2);
        if (splittedBundles != null) {
            Preconditions.checkNotNull((Object)((NamespaceBundles)splittedBundles.getLeft()));
            Preconditions.checkNotNull((Object)((List)splittedBundles.getRight()));
            Preconditions.checkArgument((((List)splittedBundles.getRight()).size() == 2 ? 1 : 0) != 0, (Object)"bundle has to be split in two bundles");
            NamespaceName nsname = bundle.getNamespaceObject();
            try {
                for (NamespaceBundle sBundle : (List)splittedBundles.getRight()) {
                    Preconditions.checkNotNull(this.ownershipCache.tryAcquiringOwnership(sBundle));
                }
                this.updateNamespaceBundles(nsname, (NamespaceBundles)splittedBundles.getLeft(), (rc, path, zkCtx, stat) -> this.pulsar.getOrderedExecutor().submit(SafeRun.safeRun(() -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        try {
                            this.ownershipCache.disableOwnership(bundle);
                            this.bundleFactory.invalidateBundleCache(nsname);
                            this.pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                            this.loadManager.get().setLoadReportForceUpdateFlag();
                            future.complete(null);
                        }
                        catch (Exception e) {
                            String msg1 = String.format("failed to disable bundle %s under namespace [%s] with error %s", nsname.toString(), bundle.toString(), e.getMessage());
                            LOG.warn(msg1, (Throwable)e);
                            future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg1));
                        }
                    } else {
                        String msg2 = String.format("failed to update namespace [%s] policies due to %s", nsname.toString(), KeeperException.create(KeeperException.Code.get(rc)).getMessage());
                        LOG.warn(msg2);
                        future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg2));
                    }
                })));
            }
            catch (Exception e) {
                String msg = String.format("failed to aquire ownership of split bundle for namespace [%s], %s", nsname.toString(), e.getMessage());
                LOG.warn(msg, (Throwable)e);
                future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
            }
        } else {
            String msg = String.format("bundle %s not found under namespace", bundle.toString());
            future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
        }
        return future;
    }

    private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, AsyncCallback.StatCallback callback) throws Exception {
        Preconditions.checkNotNull((Object)nsname);
        Preconditions.checkNotNull((Object)nsBundles);
        String path = PulsarWebResource.joinPath("/admin/local-policies", nsname.toString());
        Optional policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
        if (!policies.isPresent()) {
            this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(30L, TimeUnit.SECONDS);
            policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
        }
        ((LocalPolicies)policies.get()).bundles = NamespaceBundleFactory.getBundlesData(nsBundles);
        this.pulsar.getLocalZkCache().getZooKeeper().setData(path, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
    }

    public OwnershipCache getOwnershipCache() {
        return this.ownershipCache;
    }

    public int getTotalServiceUnitsLoaded() {
        return this.ownershipCache.getOwnedBundles().size() - this.uncountedNamespaces;
    }

    public Set<NamespaceBundle> getOwnedServiceUnits() {
        return this.ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle).collect(Collectors.toSet());
    }

    public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
        if (suName instanceof DestinationName) {
            return this.isDestinationOwned((DestinationName)suName);
        }
        if (suName instanceof NamespaceName) {
            return this.isNamespaceOwned((NamespaceName)suName);
        }
        if (suName instanceof NamespaceBundle) {
            return this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle)suName);
        }
        throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName());
    }

    public boolean isServiceUnitActive(DestinationName fqdn) {
        try {
            return this.ownershipCache.getOwnedBundle(this.getBundle(fqdn)).isActive();
        }
        catch (Exception exception) {
            LOG.warn("Unable to find OwnedBundle for fqdn - [{}]", (Object)fqdn.toString());
            return false;
        }
    }

    private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
        return this.ownershipCache.getOwnedBundle(this.getFullBundle(fqnn)) != null;
    }

    private CompletableFuture<Boolean> isDestinationOwnedAsync(DestinationName topic) {
        return this.getBundleAsync(topic).thenApply(bundle -> this.ownershipCache.isNamespaceBundleOwned((NamespaceBundle)bundle));
    }

    private boolean isDestinationOwned(DestinationName fqdn) throws Exception {
        return this.ownershipCache.getOwnedBundle(this.getBundle(fqdn)) != null;
    }

    public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
        this.ownershipCache.removeOwnership(this.getFullBundle(nsName)).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(nsName);
    }

    public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
        this.ownershipCache.removeOwnership(nsBundle).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
    }

    public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
        this.ownershipCache.removeOwnership(this.bundleFactory.getBundles(nsName, bundleData)).get(30L, TimeUnit.SECONDS);
        this.bundleFactory.invalidateBundleCache(nsName);
    }

    public NamespaceBundleFactory getNamespaceBundleFactory() {
        return this.bundleFactory;
    }

    public ServiceUnitId getServiceUnitId(DestinationName destinationName) throws Exception {
        return this.getBundle(destinationName);
    }

    public List<String> getListOfDestinations(String property, String cluster, String namespace) throws Exception {
        ArrayList destinations = Lists.newArrayList();
        try {
            String path = String.format("/managed-ledgers/%s/%s/%s/persistent", property, cluster, namespace);
            LOG.debug("Getting children from managed-ledgers now: {}", (Object)path);
            for (String destination : this.pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) {
                destinations.add(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, Codec.decode((String)destination)));
            }
        }
        catch (KeeperException.NoNodeException noNodeException) {}
        destinations.sort(null);
        return destinations;
    }

    public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
        return this.getOwnerAsync(bundle).get(30L, TimeUnit.SECONDS);
    }

    public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
        return this.ownershipCache.getOwnerAsync(bundle);
    }

    public void unloadSLANamespace() throws Exception {
        NamespaceBundle nsFullBundle;
        PulsarAdmin adminClient = null;
        String namespaceName = NamespaceService.getSLAMonitorNamespace(this.host, this.config);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying to unload SLA namespace {}", (Object)namespaceName);
        }
        if (!this.getOwner(nsFullBundle = this.getFullBundle(new NamespaceName(namespaceName))).isPresent()) {
            return;
        }
        adminClient = this.pulsar.getAdminClient();
        adminClient.namespaces().unload(namespaceName);
        LOG.debug("Namespace {} unloaded successfully", (Object)namespaceName);
    }

    public static String getHeartbeatNamespace(String host, ServiceConfiguration config) {
        return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
    }

    public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
        return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
    }

    public static String checkHeartbeatNamespace(ServiceUnitId ns) {
        Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
        if (m.matches()) {
            LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", (Object)ns.getNamespaceObject().toString());
            return String.format("http://%s", m.group(1));
        }
        return null;
    }

    public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
        Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
        if (m.matches()) {
            return String.format("http://%s", m.group(1));
        }
        return null;
    }

    public boolean registerSLANamespace() throws PulsarServerException {
        boolean isNameSpaceRegistered = this.registerNamespace(NamespaceService.getSLAMonitorNamespace(this.host, this.config), false);
        if (isNameSpaceRegistered) {
            ++this.uncountedNamespaces;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}", (Object)NamespaceService.getSLAMonitorNamespace(this.host, this.config));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("SLA Monitoring not owned by the broker: ns={}", (Object)NamespaceService.getSLAMonitorNamespace(this.host, this.config));
        }
        return isNameSpaceRegistered;
    }

    public static enum AddressType {
        BROKER_URL,
        LOOKUP_URL;

    }
}

