/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/namespaces")
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
@Api(value="/namespaces", description="Namespaces admin apis", tags={"namespaces"})
public class Namespaces
extends AdminResource {
    public static final String GLOBAL_CLUSTER = "global";
    private static final long MAX_BUNDLES = 0x100000000L;
    private static final Logger log = LoggerFactory.getLogger(Namespaces.class);

    @GET
    @Path(value="/{property}")
    @ApiOperation(value="Get the list of all the namespaces for a certain property.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property doesn't exist")})
    public List<String> getPropertyNamespaces(@PathParam(value="property") String property) {
        this.validateAdminAccessOnProperty(property);
        try {
            return this.getListOfNamespaces(property);
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to get namespace list for propery: {} - Does not exist", (Object)this.clientAppId(), (Object)property);
            throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get namespaces list: {}", (Object)this.clientAppId(), (Object)e);
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}")
    @ApiOperation(value="Get the list of all the namespaces for a certain property on single cluster.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster doesn't exist")})
    public List<String> getNamespacesForCluster(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster) {
        this.validateAdminAccessOnProperty(property);
        ArrayList namespaces = Lists.newArrayList();
        if (!this.clusters().contains(cluster)) {
            log.warn("[{}] Failed to get namespace list for property: {}/{} - Cluster does not exist", new Object[]{this.clientAppId(), property, cluster});
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        }
        try {
            for (String namespace : this.globalZk().getChildren(Namespaces.path("policies", property, cluster), false)) {
                namespaces.add(String.format("%s/%s/%s", property, cluster, namespace));
            }
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (Exception e) {
            log.error("[{}] Failed to get namespaces list: {}", (Object)this.clientAppId(), (Object)e);
            throw new RestException(e);
        }
        namespaces.sort(null);
        return namespaces;
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/destinations")
    @ApiOperation(value="Get the list of all the destinations under a certain namespace.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist")})
    public List<String> getDestinations(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        this.getNamespacePolicies(property, cluster, namespace);
        try {
            return this.pulsar().getNamespaceService().getListOfDestinations(property, cluster, namespace);
        }
        catch (Exception e) {
            log.error("Failed to get topics list for namespace {}/{}/{}", new Object[]{property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Get the dump all the policies specified for a namespace.", response=Policies.class)
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist")})
    public Policies getPolicies(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        return this.getNamespacePolicies(property, cluster, namespace);
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Creates a new empty namespace with no policies attached.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=409, message="Namespace already exists"), @ApiResponse(code=412, message="Namespace name is not valid")})
    public void createNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, BundlesData initialBundles) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterForProperty(property, cluster);
        }
        if (!this.clusters().contains(cluster)) {
            log.warn("[{}] Failed to create namespace. Cluster {} does not exist", (Object)this.clientAppId(), (Object)cluster);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        }
        try {
            Preconditions.checkNotNull((Object)this.propertiesCache().get(Namespaces.path("policies", property)));
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to create namespace. Property {} does not exist", (Object)this.clientAppId(), (Object)property);
            throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
        }
        catch (RestException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        try {
            NamedEntity.checkName((String)namespace);
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            Policies policies = new Policies();
            if (initialBundles != null && initialBundles.getNumBundles() > 0) {
                policies.bundles = initialBundles.getBoundaries() == null || initialBundles.getBoundaries().size() == 0 ? this.getBundles(initialBundles.getNumBundles()) : this.validateBundlesData(initialBundles);
            }
            this.zkCreateOptimistic(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes((Object)policies));
            log.info("[{}] Created namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
            log.warn("[{}] Failed to create namespace {}/{}/{} - already exists", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Namespace already exists");
        }
        catch (IllegalArgumentException e) {
            log.warn("[{}] Failed to create namespace with invalid name {}", new Object[]{this.clientAppId(), property, e});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Namespace name is not valid");
        }
        catch (Exception e) {
            log.error("[{}] Failed to create namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Delete a namespace and all the destinations under it.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=409, message="Namespace is not empty")})
    public void deleteNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        this.validateClusterOwnership(cluster);
        Map.Entry policiesNode = null;
        Policies policies = null;
        try {
            policiesNode = (Map.Entry)this.policiesCache().getWithStat(Namespaces.path("policies", property, cluster, namespace)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace " + nsName + " does not exist."));
            policies = (Policies)policiesNode.getKey();
            if (cluster.equals(GLOBAL_CLUSTER)) {
                if (policies.replication_clusters.size() > 1) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + nsName + ". There are still more than one replication clusters configured.");
                }
                if (policies.replication_clusters.size() == 1 && !policies.replication_clusters.contains(this.config().getClusterName())) {
                    URL replClusterUrl;
                    String replCluster = (String)policies.replication_clusters.get(0);
                    ClusterData replClusterData = (ClusterData)this.clustersCache().get(AdminResource.path("clusters", replCluster)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Cluser " + replCluster + " does not exist"));
                    if (!this.config().isTlsEnabled()) {
                        replClusterUrl = new URL(replClusterData.getServiceUrl());
                    } else if (!replClusterData.getServiceUrlTls().isEmpty()) {
                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
                    } else {
                        throw new RestException(Response.Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service");
                    }
                    URI redirect = UriBuilder.fromUri((URI)this.uri.getRequestUri()).host(replClusterUrl.getHost()).port(replClusterUrl.getPort()).replaceQueryParam("authoritative", new Object[]{false}).build(new Object[0]);
                    log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{this.clientAppId(), redirect, cluster});
                    throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
                }
            }
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        List<String> destinations = this.getDestinations(property, cluster, namespace);
        if (!destinations.isEmpty()) {
            log.info("Found destinations: {}", destinations);
            throw new RestException(Response.Status.CONFLICT, "Cannot delete non empty namespace");
        }
        try {
            policies.deleted = true;
            this.globalZk().setData(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes((Object)policies), ((Stat)policiesNode.getValue()).getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
        }
        catch (Exception e) {
            log.error("[{}] Failed to delete namespace on global ZK {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        try {
            NamespaceBundles bundles = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(nsName);
            for (NamespaceBundle bundle : bundles.getBundles()) {
                if (!this.pulsar().getNamespaceService().getOwner(bundle).isPresent()) continue;
                this.pulsar().getAdminClient().namespaces().deleteNamespaceBundle(nsName.toString(), bundle.getBundleRange());
            }
            String globalZkPolicyPath = Namespaces.path("policies", property, cluster, namespace);
            String lcaolZkPolicyPath = Namespaces.joinPath("/admin/local-policies", property, cluster, namespace);
            this.globalZk().delete(globalZkPolicyPath, -1);
            this.localZk().delete(lcaolZkPolicyPath, -1);
            this.policiesCache().invalidate(globalZkPolicyPath);
            this.localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
        }
        catch (PulsarAdminException cae) {
            throw new RestException(cae);
        }
        catch (Exception e) {
            log.error(String.format("[%s] Failed to remove owned namespace %s/%s/%s", this.clientAppId(), property, cluster, namespace), (Throwable)e);
        }
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}")
    @ApiOperation(value="Delete a namespace bundle and all the destinations under it.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=409, message="Namespace bundle is not empty")})
    public void deleteNamespaceBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        this.validateClusterOwnership(cluster);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        try {
            if (cluster.equals(GLOBAL_CLUSTER)) {
                if (policies.replication_clusters.size() > 1) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + nsName + ". There are still more than one replication clusters configured.");
                }
                if (policies.replication_clusters.size() == 1 && !policies.replication_clusters.contains(this.config().getClusterName())) {
                    URL replClusterUrl;
                    String replCluster = (String)policies.replication_clusters.get(0);
                    ClusterData replClusterData = (ClusterData)this.clustersCache().get(AdminResource.path("clusters", replCluster)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Cluser " + replCluster + " does not exist"));
                    if (!this.config().isTlsEnabled()) {
                        replClusterUrl = new URL(replClusterData.getServiceUrl());
                    } else if (!replClusterData.getServiceUrlTls().isEmpty()) {
                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
                    } else {
                        throw new RestException(Response.Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service");
                    }
                    URI redirect = UriBuilder.fromUri((URI)this.uri.getRequestUri()).host(replClusterUrl.getHost()).port(replClusterUrl.getPort()).replaceQueryParam("authoritative", new Object[]{false}).build(new Object[0]);
                    log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{this.clientAppId(), redirect, cluster});
                    throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
                }
            }
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        NamespaceBundle bundle = this.validateNamespaceBundleOwnership(nsName, policies.bundles, bundleRange, authoritative, true);
        try {
            List<String> destinations = this.getDestinations(property, cluster, namespace);
            for (String destination : destinations) {
                NamespaceBundle destinationBundle = this.pulsar().getNamespaceService().getBundle(DestinationName.get((String)destination));
                if (!bundle.equals(destinationBundle)) continue;
                throw new RestException(Response.Status.CONFLICT, "Cannot delete non empty bundle");
            }
            this.pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            log.error("[{}] Failed to remove namespace bundle {}/{}", new Object[]{this.clientAppId(), nsName.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/permissions")
    @ApiOperation(value="Retrieve the permissions for a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=409, message="Namespace is not empty")})
    public Map<String, Set<AuthAction>> getPermissions(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        return policies.auth_policies.namespace_auth;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/permissions/{role}")
    @ApiOperation(value="Grant a new permission to a role on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=409, message="Concurrent modification")})
    public void grantPermissionOnNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="role") String role, Set<AuthAction> actions) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        try {
            Stat nodeStat = new Stat();
            byte[] content = this.globalZk().getData(Namespaces.path("policies", property, cluster, namespace), null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            policies.auth_policies.namespace_auth.put(role, actions);
            this.globalZk().setData(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", new Object[]{this.clientAppId(), role, actions, property, cluster, namespace});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get permissions for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/permissions/{role}")
    @ApiOperation(value="Revoke all permissions to a role on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist")})
    public void revokePermissionsOnNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="role") String role) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        try {
            Stat nodeStat = new Stat();
            byte[] content = this.globalZk().getData(Namespaces.path("policies", property, cluster, namespace), null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            policies.auth_policies.namespace_auth.remove(role);
            this.globalZk().setData(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully revoked access for role {} - namespace {}/{}/{}", new Object[]{this.clientAppId(), role, property, cluster, namespace});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to revoke permissions for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to revoke permissions on namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/replication")
    @ApiOperation(value="Get the replication clusters for a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=412, message="Namespace is not global")})
    public List<String> getNamespaceReplicationClusters(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot get the replication clusters for a non-global namespace");
        }
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        return policies.replication_clusters;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/replication")
    @ApiOperation(value="Set the replication clusters for a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=412, message="Namespace is not global or invalid cluster ids")})
    public void setNamespaceReplicationClusters(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, List<String> clusterIds) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot set replication on a non-global namespace");
        }
        if (clusterIds.contains(GLOBAL_CLUSTER)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot specify global in the list of replication clusters");
        }
        Set<String> clusters = this.clusters();
        for (String clusterId : clusterIds) {
            if (clusters.contains(clusterId)) continue;
            throw new RestException(Response.Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
        }
        for (String clusterId : clusterIds) {
            this.validateClusterForProperty(property, clusterId);
        }
        Map.Entry policiesNode = null;
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        try {
            policiesNode = (Map.Entry)this.policiesCache().getWithStat(Namespaces.path("policies", property, cluster, namespace)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
            ((Policies)policiesNode.getKey()).replication_clusters = clusterIds;
            this.globalZk().setData(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes(policiesNode.getKey()), ((Stat)policiesNode.getValue()).getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully updated the replication clusters on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update the replication clusters for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update the replication clusters on namespace {}/{}/{} expected policy node version={} : concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace, ((Stat)policiesNode.getValue()).getVersion()});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to update the replication clusters on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/messageTTL")
    @ApiOperation(value="Get the message TTL for the namespace")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist")})
    public int getNamespaceMessageTTL(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        return policies.message_ttl_in_seconds;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/messageTTL")
    @ApiOperation(value="Set message TTL in seconds for namespace")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=412, message="Invalid TTL")})
    public void setNamespaceMessageTTL(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, int messageTTL) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        if (messageTTL < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Invalid value for message TTL");
        }
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        Map.Entry policiesNode = null;
        try {
            policiesNode = (Map.Entry)this.policiesCache().getWithStat(Namespaces.path("policies", property, cluster, namespace)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
            ((Policies)policiesNode.getKey()).message_ttl_in_seconds = messageTTL;
            this.globalZk().setData(Namespaces.path("policies", property, cluster, namespace), Namespaces.jsonMapper().writeValueAsBytes(policiesNode.getKey()), ((Stat)policiesNode.getValue()).getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully updated the message TTL on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update the message TTL for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update the message TTL on namespace {}/{}/{} expected policy node version={} : concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace, ((Stat)policiesNode.getValue()).getVersion()});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to update the message TTL on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/bundles")
    @ApiOperation(value="Get the bundles split data.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=412, message="Namespace is not setup to split in bundles")})
    public BundlesData getBundlesData(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        return policies.bundles;
    }

    private BundlesData validateBundlesData(BundlesData initialBundles) {
        TreeSet<String> partitions = new TreeSet<String>();
        for (String partition : initialBundles.getBoundaries()) {
            Long partBoundary = Long.decode(partition);
            partitions.add(String.format("0x%08x", partBoundary));
        }
        if (partitions.size() != initialBundles.getBoundaries().size()) {
            log.debug("Input bundles included repeated partition points. Ignored.");
        }
        try {
            NamespaceBundleFactory.validateFullRange(partitions);
        }
        catch (IllegalArgumentException illegalArgumentException) {
            throw new RestException(Response.Status.BAD_REQUEST, "Input bundles do not cover the whole hash range. first:" + (String)partitions.first() + ", last:" + (String)partitions.last());
        }
        ArrayList bundles = Lists.newArrayList();
        bundles.addAll(partitions);
        return new BundlesData((List)bundles);
    }

    private BundlesData getBundles(int numBundles) {
        if (numBundles <= 0 || (long)numBundles > 0x100000000L) {
            throw new RestException(Response.Status.BAD_REQUEST, "Invalid number of bundles. Number of numbles has to be in the range of (0, 2^32].");
        }
        Long maxVal = 0x100000000L;
        Long segSize = maxVal / (long)numBundles;
        ArrayList partitions = Lists.newArrayList();
        partitions.add(String.format("0x%08x", 0L));
        Long curPartition = segSize;
        int i = 0;
        while (i < numBundles) {
            if (i != numBundles - 1) {
                partitions.add(String.format("0x%08x", curPartition));
            } else {
                partitions.add(String.format("0x%08x", maxVal - 1L));
            }
            curPartition = curPartition + segSize;
            ++i;
        }
        return new BundlesData((List)partitions);
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/unload")
    @ApiOperation(value="Unload namespace", notes="Unload an active namespace from the current broker serving it. Performing this operation will let the brokerremoves all producers, consumers, and connections using this namespace, and close all destinations (includingtheir persistent store). During that operation, the namespace is marked as tentatively unavailable until thebroker completes the unloading action. This operation requires strictly super user privileges, since it wouldresult in non-persistent message loss and unexpected connection closure to the clients.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Property or cluster or namespace doesn't exist"), @ApiResponse(code=412, message="Namespace is already unloaded or Namespace has bundles activated")})
    public void unloadNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        log.info("[{}] Unloading namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        this.validateSuperUserAccess();
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        List boundaries = policies.bundles.getBoundaries();
        int i = 0;
        while (i < boundaries.size() - 1) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            try {
                this.pulsar().getAdminClient().namespaces().unloadNamespaceBundle(nsName.toString(), bundle);
            }
            catch (PulsarServerException | PulsarAdminException e) {
                log.error(String.format("[%s] Failed to unload namespace %s/%s/%s", this.clientAppId(), property, cluster, namespace), e);
                throw new RestException(e);
            }
            ++i;
        }
        log.info("[{}] Successfully unloaded all the bundles in namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}/unload")
    @ApiOperation(value="Unload a namespace bundle")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public void unloadNamespaceBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        log.info("[{}] Unloading namespace bundle {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
        this.validateSuperUserAccess();
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        NamespaceName fqnn = new NamespaceName(property, cluster, namespace);
        this.validatePoliciesReadOnlyAccess();
        NamespaceBundle nsBundle = this.validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, authoritative, true);
        try {
            this.pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle);
            log.info("[{}] Successfully unloaded namespace bundle {}", (Object)this.clientAppId(), (Object)nsBundle.toString());
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}/split")
    @ApiOperation(value="Split a namespace bundle")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public void splitNamespaceBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        log.info("[{}] Split namespace bundle {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
        this.validateSuperUserAccess();
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        NamespaceName fqnn = new NamespaceName(property, cluster, namespace);
        this.validatePoliciesReadOnlyAccess();
        NamespaceBundle nsBundle = this.validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, authoritative, true);
        try {
            this.pulsar().getNamespaceService().splitAndOwnBundle(nsBundle).get();
            log.info("[{}] Successfully split namespace bundle {}", (Object)this.clientAppId(), (Object)nsBundle.toString());
        }
        catch (Exception e) {
            log.error("[{}] Failed to split namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/backlogQuotaMap")
    @ApiOperation(value="Get backlog quota map on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        return policies.backlog_quota_map;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/backlogQuota")
    @ApiOperation(value=" Set a backlog quota for all the destinations on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Specified backlog quota exceeds retention quota. Increase retention quota and retry request")})
    public void setBacklogQuota(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @QueryParam(value="backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        if (backlogQuotaType == null) {
            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
        }
        try {
            Stat nodeStat = new Stat();
            String path = Namespaces.path("policies", property, cluster, namespace);
            byte[] content = this.globalZk().getData(path, null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            RetentionPolicies r = policies.retention_policies;
            if (r != null) {
                Policies p = new Policies();
                p.backlog_quota_map.put(backlogQuotaType, backlogQuota);
                if (!this.checkQuotas(p, r)) {
                    log.warn("[{}] Failed to update backlog configuration for namespace {}/{}/{}: conflicts with retention quota", new Object[]{this.clientAppId(), property, cluster, namespace});
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Backlog Quota exceeds configured retention quota for namespace. Please increase retention quota and retry");
                }
            }
            policies.backlog_quota_map.put(backlogQuotaType, backlogQuota);
            this.globalZk().setData(path, Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully updated backlog quota map: namespace={}/{}/{}, map={}", new Object[]{this.clientAppId(), property, cluster, namespace, Namespaces.jsonMapper().writeValueAsString((Object)policies.backlog_quota_map)});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (RestException pfe) {
            throw pfe;
        }
        catch (Exception e) {
            log.error("[{}] Failed to update backlog quota map for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/backlogQuota")
    @ApiOperation(value="Remove a backlog quota policy from a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeBacklogQuota(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @QueryParam(value="backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType) {
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        if (backlogQuotaType == null) {
            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
        }
        try {
            Stat nodeStat = new Stat();
            String path = Namespaces.path("policies", property, cluster, namespace);
            byte[] content = this.globalZk().getData(path, null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            policies.backlog_quota_map.remove(backlogQuotaType);
            this.globalZk().setData(path, Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully removed backlog namespace={}/{}/{}, quota={}", new Object[]{this.clientAppId(), property, cluster, namespace, backlogQuotaType});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to update backlog quota map for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/retention")
    @ApiOperation(value="Get retention config on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public RetentionPolicies getRetention(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (policies.retention_policies == null) {
            return new RetentionPolicies(this.config().getDefaultRetentionTimeInMinutes(), this.config().getDefaultRetentionSizeInMB());
        }
        return policies.retention_policies;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/retention")
    @ApiOperation(value=" Set retention configuration on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Retention Quota must exceed backlog quota")})
    public void setRetention(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, RetentionPolicies retention) {
        this.validatePoliciesReadOnlyAccess();
        try {
            Stat nodeStat = new Stat();
            String path = Namespaces.path("policies", property, cluster, namespace);
            byte[] content = this.globalZk().getData(path, null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            if (!this.checkQuotas(policies, retention)) {
                log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: conflicts with backlog quota", new Object[]{this.clientAppId(), property, cluster, namespace});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Retention Quota must exceed configured backlog quota for namespace.");
            }
            policies.retention_policies = retention;
            this.globalZk().setData(path, Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully updated retention configuration: namespace={}/{}/{}, map={}", new Object[]{this.clientAppId(), property, cluster, namespace, Namespaces.jsonMapper().writeValueAsString((Object)policies.retention_policies)});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (RestException pfe) {
            throw pfe;
        }
        catch (Exception e) {
            log.error("[{}] Failed to update retention configuration for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
        Map backlog_quota_map = policies.backlog_quota_map;
        if (backlog_quota_map.isEmpty() || retention.getRetentionSizeInMB() == 0L) {
            return true;
        }
        BacklogQuota quota = (BacklogQuota)backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
        if (quota == null) {
            quota = this.pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
        }
        return quota.getLimit() < retention.getRetentionSizeInMB() * 1024L * 1024L;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/persistence")
    @ApiOperation(value="Set the persistence configuration for all the destinations on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setPersistence(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, PersistencePolicies persistence) {
        this.validatePoliciesReadOnlyAccess();
        try {
            Stat nodeStat = new Stat();
            String path = Namespaces.path("policies", property, cluster, namespace);
            byte[] content = this.globalZk().getData(path, null, nodeStat);
            Policies policies = (Policies)Namespaces.jsonMapper().readValue(content, Policies.class);
            policies.persistence = persistence;
            this.globalZk().setData(path, Namespaces.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(Namespaces.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully updated persistence configuration: namespace={}/{}/{}, map={}", new Object[]{this.clientAppId(), property, cluster, namespace, Namespaces.jsonMapper().writeValueAsString((Object)policies.persistence)});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to update persistence configuration for namespace {}/{}/{}: does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException badVersionException) {
            log.warn("[{}] Failed to update persistence configuration for namespace {}/{}/{}: concurrent modification", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to update persistence configuration for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/persistence")
    @ApiOperation(value="Get the persistence configuration for a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=409, message="Concurrent modification")})
    public PersistencePolicies getPersistence(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (policies.persistence == null) {
            return new PersistencePolicies(this.config().getManagedLedgerDefaultEnsembleSize(), this.config().getManagedLedgerDefaultWriteQuorum(), this.config().getManagedLedgerDefaultAckQuorum(), 0.0);
        }
        return policies.persistence;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/clearBacklog")
    @ApiOperation(value="Clear backlog for all destinations on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void clearNamespaceBacklog(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        try {
            NamespaceBundles bundles = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(nsName);
            Throwable exception = null;
            for (NamespaceBundle nsBundle : bundles.getBundles()) {
                try {
                    if (!this.pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) continue;
                    this.pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklog(nsName.toString(), nsBundle.getBundleRange());
                }
                catch (Exception e) {
                    if (exception != null) continue;
                    exception = e;
                }
            }
            if (exception != null) {
                if (exception instanceof PulsarAdminException) {
                    throw new RestException((PulsarAdminException)exception);
                }
                throw new RestException(exception.getCause());
            }
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", (Object)this.clientAppId(), (Object)nsName.toString());
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}/clearBacklog")
    @ApiOperation(value="Clear backlog for all destinations on a namespace bundle.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void clearNamespaceBundleBacklog(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        this.validateNamespaceBundleOwnership(nsName, policies.bundles, bundleRange, authoritative, true);
        this.clearBacklog(nsName, bundleRange, null);
        log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", new Object[]{this.clientAppId(), nsName.toString(), bundleRange});
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/clearBacklog/{subscription}")
    @ApiOperation(value="Clear backlog for a given subscription on all destinations on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void clearNamespaceBacklogForSubscription(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="subscription") String subscription, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        try {
            NamespaceBundles bundles = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(nsName);
            Throwable exception = null;
            for (NamespaceBundle nsBundle : bundles.getBundles()) {
                try {
                    if (!this.pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) continue;
                    this.pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscription(nsName.toString(), nsBundle.getBundleRange(), subscription);
                }
                catch (Exception e) {
                    if (exception != null) continue;
                    exception = e;
                }
            }
            if (exception != null) {
                if (exception instanceof PulsarAdminException) {
                    throw new RestException((PulsarAdminException)exception);
                }
                throw new RestException(exception.getCause());
            }
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", new Object[]{this.clientAppId(), subscription, nsName.toString()});
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}/clearBacklog/{subscription}")
    @ApiOperation(value="Clear backlog for a given subscription on all destinations on a namespace bundle.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void clearNamespaceBundleBacklogForSubscription(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="subscription") String subscription, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        this.validateNamespaceBundleOwnership(nsName, policies.bundles, bundleRange, authoritative, true);
        this.clearBacklog(nsName, bundleRange, subscription);
        log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", new Object[]{this.clientAppId(), subscription, nsName.toString(), bundleRange});
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/unsubscribe/{subscription}")
    @ApiOperation(value="Unsubscribes the given subscription on all destinations on a namespace.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void unsubscribeNamespace(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="subscription") String subscription, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        try {
            NamespaceBundles bundles = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(nsName);
            Throwable exception = null;
            for (NamespaceBundle nsBundle : bundles.getBundles()) {
                try {
                    if (!this.pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) continue;
                    this.pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(nsName.toString(), nsBundle.getBundleRange(), subscription);
                }
                catch (Exception e) {
                    if (exception != null) continue;
                    exception = e;
                }
            }
            if (exception != null) {
                if (exception instanceof PulsarAdminException) {
                    throw new RestException((PulsarAdminException)exception);
                }
                throw new RestException(exception.getCause());
            }
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", new Object[]{this.clientAppId(), subscription, nsName.toString()});
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}/unsubscribe/{subscription}")
    @ApiOperation(value="Unsubscribes the given subscription on all destinations on a namespace bundle.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public void unsubscribeNamespaceBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="subscription") String subscription, @PathParam(value="bundle") String bundleRange, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals(GLOBAL_CLUSTER)) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        }
        NamespaceName nsName = new NamespaceName(property, cluster, namespace);
        this.validateNamespaceBundleOwnership(nsName, policies.bundles, bundleRange, authoritative, true);
        this.unsubscribe(nsName, bundleRange, subscription);
        log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", new Object[]{this.clientAppId(), subscription, nsName.toString(), bundleRange});
    }

    private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
        try {
            List<Topic> topicList = this.pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), String.valueOf(nsName.toString()) + "/" + bundleRange);
            ArrayList futures = Lists.newArrayList();
            if (subscription != null) {
                if (subscription.startsWith(this.pulsar().getConfiguration().getReplicatorPrefix())) {
                    subscription = PersistentReplicator.getRemoteCluster(subscription);
                }
                for (Topic topic : topicList) {
                    if (!(topic instanceof PersistentTopic)) continue;
                    futures.add(((PersistentTopic)topic).clearBacklog(subscription));
                }
            } else {
                for (Topic topic : topicList) {
                    if (!(topic instanceof PersistentTopic)) continue;
                    futures.add(((PersistentTopic)topic).clearBacklog());
                }
            }
            FutureUtil.waitForAll((List)futures).get();
        }
        catch (Exception e) {
            log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", new Object[]{this.clientAppId(), nsName.toString(), bundleRange, subscription, e});
            throw new RestException(e);
        }
    }

    private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
        try {
            List<Topic> topicList = this.pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), String.valueOf(nsName.toString()) + "/" + bundleRange);
            ArrayList futures = Lists.newArrayList();
            if (subscription.startsWith(this.pulsar().getConfiguration().getReplicatorPrefix())) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
            }
            for (Topic topic : topicList) {
                Subscription sub = topic.getSubscription(subscription);
                if (sub == null) continue;
                futures.add(sub.delete());
            }
            FutureUtil.waitForAll((List)futures).get();
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception e) {
            log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", new Object[]{this.clientAppId(), subscription, nsName.toString(), bundleRange, e});
            if (e.getCause() instanceof BrokerServiceException.SubscriptionBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
            }
            throw new RestException(e.getCause());
        }
    }
}

