package org.apache.pulsar.broker.admin;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(value = "/namespaces", description = "Namespaces admin apis", tags = {"namespaces"})
@Path("/namespaces")
@Consumes({"application/json"})
@Produces({"application/json"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/Namespaces.class */
public class Namespaces extends AdminResource {
    public static final String GLOBAL_CLUSTER = "global";
    private static final long MAX_BUNDLES = 4294967296L;
    private static final Logger log = LoggerFactory.getLogger(Namespaces.class);

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property doesn't exist")})
    @Path("/{property}")
    @ApiOperation(value = "Get the list of all the namespaces for a certain property.", response = String.class, responseContainer = "Set")
    public List<String> getPropertyNamespaces(@PathParam("property") String str) {
        validateAdminAccessOnProperty(str);
        try {
            return getListOfNamespaces(str);
        } catch (KeeperException.NoNodeException unused) {
            log.warn("[{}] Failed to get namespace list for propery: {} - Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster doesn't exist")})
    @Path("/{property}/{cluster}")
    @ApiOperation(value = "Get the list of all the namespaces for a certain property on single cluster.", response = String.class, responseContainer = "Set")
    public List<String> getNamespacesForCluster(@PathParam("property") String str, @PathParam("cluster") String str2) {
        validateAdminAccessOnProperty(str);
        ArrayList newArrayList = Lists.newArrayList();
        if (!clusters().contains(str2)) {
            log.warn("[{}] Failed to get namespace list for property: {}/{} - Cluster does not exist", new Object[]{clientAppId(), str, str2});
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        }
        try {
            Iterator<String> it = globalZk().getChildren(path("policies", str, str2), false).iterator();
            while (it.hasNext()) {
                newArrayList.add(String.format("%s/%s/%s", str, str2, it.next()));
            }
        } catch (KeeperException.NoNodeException unused) {
        } catch (Exception e) {
            log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
            throw new RestException(e);
        }
        newArrayList.sort(null);
        return newArrayList;
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}/destinations")
    @ApiOperation(value = "Get the list of all the destinations under a certain namespace.", response = String.class, responseContainer = "Set")
    public List<String> getDestinations(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        getNamespacePolicies(str, str2, str3);
        try {
            return pulsar().getNamespaceService().getListOfDestinations(str, str2, str3);
        } catch (Exception e) {
            log.error("Failed to get topics list for namespace {}/{}/{}", new Object[]{str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}")
    @ApiOperation(value = "Get the dump all the policies specified for a namespace.", response = Policies.class)
    public Policies getPolicies(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        return getNamespacePolicies(str, str2, str3);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(code = 412, message = "Namespace name is not valid")})
    @Path("/{property}/{cluster}/{namespace}")
    @ApiOperation("Creates a new empty namespace with no policies attached.")
    @PUT
    public void createNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, BundlesData bundlesData) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterForProperty(str, str2);
        }
        if (!clusters().contains(str2)) {
            log.warn("[{}] Failed to create namespace. Cluster {} does not exist", clientAppId(), str2);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        }
        try {
            Preconditions.checkNotNull(propertiesCache().get(path("policies", str)));
            try {
                NamedEntity.checkName(str3);
                policiesCache().invalidate(path("policies", str, str2, str3));
                Policies policies = new Policies();
                if (bundlesData != null && bundlesData.getNumBundles() > 0) {
                    if (bundlesData.getBoundaries() == null || bundlesData.getBoundaries().size() == 0) {
                        policies.bundles = getBundles(bundlesData.getNumBundles());
                    } else {
                        policies.bundles = validateBundlesData(bundlesData);
                    }
                }
                zkCreateOptimistic(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(policies));
                log.info("[{}] Created namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3});
            } catch (IllegalArgumentException e) {
                log.warn("[{}] Failed to create namespace with invalid name {}", new Object[]{clientAppId(), str, e});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Namespace name is not valid");
            } catch (KeeperException.NodeExistsException unused) {
                log.warn("[{}] Failed to create namespace {}/{}/{} - already exists", new Object[]{clientAppId(), str, str2, str3});
                throw new RestException(Response.Status.CONFLICT, "Namespace already exists");
            } catch (Exception e2) {
                log.error("[{}] Failed to create namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e2});
                throw new RestException(e2);
            }
        } catch (Exception e3) {
            throw new RestException(e3);
        } catch (RestException e4) {
            throw e4;
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to create namespace. Property {} does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")})
    @Path("/{property}/{cluster}/{namespace}")
    @DELETE
    @ApiOperation("Delete a namespace and all the destinations under it.")
    public void deleteNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        URL url;
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        validateClusterOwnership(str2);
        try {
            Map.Entry entry = (Map.Entry) policiesCache().getWithStat(path("policies", str, str2, str3)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist.");
            });
            Policies policies = (Policies) entry.getKey();
            if (str2.equals(GLOBAL_CLUSTER)) {
                if (policies.replication_clusters.size() > 1) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + namespaceName + ". There are still more than one replication clusters configured.");
                }
                if (policies.replication_clusters.size() == 1 && !policies.replication_clusters.contains(config().getClusterName())) {
                    String str4 = (String) policies.replication_clusters.get(0);
                    ClusterData clusterData = (ClusterData) clustersCache().get(AdminResource.path("clusters", str4)).orElseThrow(() -> {
                        return new RestException(Response.Status.NOT_FOUND, "Cluser " + str4 + " does not exist");
                    });
                    if (!config().isTlsEnabled()) {
                        url = new URL(clusterData.getServiceUrl());
                    } else {
                        if (clusterData.getServiceUrlTls().isEmpty()) {
                            throw new RestException(Response.Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service");
                        }
                        url = new URL(clusterData.getServiceUrlTls());
                    }
                    URI build = UriBuilder.fromUri(this.uri.getRequestUri()).host(url.getHost()).port(url.getPort()).replaceQueryParam("authoritative", new Object[]{false}).build(new Object[0]);
                    log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{clientAppId(), build, str2});
                    throw new WebApplicationException(Response.temporaryRedirect(build).build());
                }
            }
            List<String> destinations = getDestinations(str, str2, str3);
            if (!destinations.isEmpty()) {
                log.info("Found destinations: {}", destinations);
                throw new RestException(Response.Status.CONFLICT, "Cannot delete non empty namespace");
            }
            try {
                policies.deleted = true;
                globalZk().setData(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(policies), ((Stat) entry.getValue()).getVersion());
                policiesCache().invalidate(path("policies", str, str2, str3));
                try {
                    for (NamespaceBundle namespaceBundle : pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(namespaceName).getBundles()) {
                        if (pulsar().getNamespaceService().getOwner(namespaceBundle).isPresent()) {
                            pulsar().getAdminClient().namespaces().deleteNamespaceBundle(namespaceName.toString(), namespaceBundle.getBundleRange());
                        }
                    }
                    String path = path("policies", str, str2, str3);
                    String joinPath = joinPath(LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT, str, str2, str3);
                    globalZk().delete(path, -1);
                    localZk().delete(joinPath, -1);
                    policiesCache().invalidate(path);
                    localCacheService().policiesCache().invalidate(joinPath);
                } catch (PulsarAdminException e) {
                    throw new RestException(e);
                } catch (Exception e2) {
                    log.error(String.format("[%s] Failed to remove owned namespace %s/%s/%s", clientAppId(), str, str2, str3), e2);
                }
            } catch (Exception e3) {
                log.error("[{}] Failed to delete namespace on global ZK {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e3});
                throw new RestException(e3);
            }
        } catch (Exception e4) {
            throw new RestException(e4);
        } catch (WebApplicationException e5) {
            throw e5;
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace bundle is not empty")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}")
    @DELETE
    @ApiOperation("Delete a namespace bundle and all the destinations under it.")
    public void deleteNamespaceBundle(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("bundle") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        URL url;
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        validateClusterOwnership(str2);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        try {
            if (str2.equals(GLOBAL_CLUSTER)) {
                if (namespacePolicies.replication_clusters.size() > 1) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + namespaceName + ". There are still more than one replication clusters configured.");
                }
                if (namespacePolicies.replication_clusters.size() == 1 && !namespacePolicies.replication_clusters.contains(config().getClusterName())) {
                    String str5 = (String) namespacePolicies.replication_clusters.get(0);
                    ClusterData clusterData = (ClusterData) clustersCache().get(AdminResource.path("clusters", str5)).orElseThrow(() -> {
                        return new RestException(Response.Status.NOT_FOUND, "Cluser " + str5 + " does not exist");
                    });
                    if (!config().isTlsEnabled()) {
                        url = new URL(clusterData.getServiceUrl());
                    } else {
                        if (clusterData.getServiceUrlTls().isEmpty()) {
                            throw new RestException(Response.Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service");
                        }
                        url = new URL(clusterData.getServiceUrlTls());
                    }
                    URI build = UriBuilder.fromUri(this.uri.getRequestUri()).host(url.getHost()).port(url.getPort()).replaceQueryParam("authoritative", new Object[]{false}).build(new Object[0]);
                    log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{clientAppId(), build, str2});
                    throw new WebApplicationException(Response.temporaryRedirect(build).build());
                }
            }
            NamespaceBundle validateNamespaceBundleOwnership = validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str4, z, true);
            try {
                Iterator<String> it = getDestinations(str, str2, str3).iterator();
                while (it.hasNext()) {
                    if (validateNamespaceBundleOwnership.equals(pulsar().getNamespaceService().getBundle(DestinationName.get(it.next())))) {
                        throw new RestException(Response.Status.CONFLICT, "Cannot delete non empty bundle");
                    }
                }
                pulsar().getNamespaceService().removeOwnedServiceUnit(validateNamespaceBundleOwnership);
            } catch (Exception e) {
                log.error("[{}] Failed to remove namespace bundle {}/{}", new Object[]{clientAppId(), namespaceName.toString(), str4, e});
                throw new RestException(e);
            } catch (WebApplicationException e2) {
                throw e2;
            }
        } catch (WebApplicationException e3) {
            throw e3;
        } catch (Exception e4) {
            throw new RestException(e4);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")})
    @Path("/{property}/{cluster}/{namespace}/permissions")
    @ApiOperation("Retrieve the permissions for a namespace.")
    public Map<String, Set<AuthAction>> getPermissions(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        return getNamespacePolicies(str, str2, str3).auth_policies.namespace_auth;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{property}/{cluster}/{namespace}/permissions/{role}")
    @ApiOperation("Grant a new permission to a role on a namespace.")
    @POST
    public void grantPermissionOnNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("role") String str4, Set<AuthAction> set) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        try {
            Stat stat = new Stat();
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path("policies", str, str2, str3), (Watcher) null, stat), Policies.class);
            policies.auth_policies.namespace_auth.put(str4, set);
            globalZk().setData(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", new Object[]{clientAppId(), str4, set, str, str2, str3});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to get permissions for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}/permissions/{role}")
    @DELETE
    @ApiOperation("Revoke all permissions to a role on a namespace.")
    public void revokePermissionsOnNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("role") String str4) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        try {
            Stat stat = new Stat();
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path("policies", str, str2, str3), (Watcher) null, stat), Policies.class);
            policies.auth_policies.namespace_auth.remove(str4);
            globalZk().setData(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully revoked access for role {} - namespace {}/{}/{}", new Object[]{clientAppId(), str4, str, str2, str3});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to revoke permissions on namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to revoke permissions for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to revoke permissions on namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is not global")})
    @Path("/{property}/{cluster}/{namespace}/replication")
    @ApiOperation(value = "Get the replication clusters for a namespace.", response = String.class, responseContainer = "List")
    public List<String> getNamespaceReplicationClusters(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        if (str2.equals(GLOBAL_CLUSTER)) {
            return getNamespacePolicies(str, str2, str3).replication_clusters;
        }
        throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot get the replication clusters for a non-global namespace");
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is not global or invalid cluster ids")})
    @Path("/{property}/{cluster}/{namespace}/replication")
    @ApiOperation("Set the replication clusters for a namespace.")
    @POST
    public void setNamespaceReplicationClusters(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, List<String> list) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        if (!str2.equals(GLOBAL_CLUSTER)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot set replication on a non-global namespace");
        }
        if (list.contains(GLOBAL_CLUSTER)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot specify global in the list of replication clusters");
        }
        Set<String> clusters = clusters();
        for (String str4 : list) {
            if (!clusters.contains(str4)) {
                throw new RestException(Response.Status.FORBIDDEN, "Invalid cluster id: " + str4);
            }
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            validateClusterForProperty(str, it.next());
        }
        Map.Entry entry = null;
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        try {
            entry = (Map.Entry) policiesCache().getWithStat(path("policies", str, str2, str3)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist");
            });
            ((Policies) entry.getKey()).replication_clusters = list;
            globalZk().setData(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(entry.getKey()), ((Stat) entry.getValue()).getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully updated the replication clusters on namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update the replication clusters on namespace {}/{}/{} expected policy node version={} : concurrent modification", new Object[]{clientAppId(), str, str2, str3, Integer.valueOf(((Stat) entry.getValue()).getVersion())});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update the replication clusters for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to update the replication clusters on namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}/messageTTL")
    @ApiOperation("Get the message TTL for the namespace")
    public int getNamespaceMessageTTL(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        return getNamespacePolicies(str, str2, str3).message_ttl_in_seconds;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL")})
    @Path("/{property}/{cluster}/{namespace}/messageTTL")
    @ApiOperation("Set message TTL in seconds for namespace")
    @POST
    public void setNamespaceMessageTTL(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, int i) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        if (i < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Invalid value for message TTL");
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        Map.Entry entry = null;
        try {
            entry = (Map.Entry) policiesCache().getWithStat(path("policies", str, str2, str3)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist");
            });
            ((Policies) entry.getKey()).message_ttl_in_seconds = i;
            globalZk().setData(path("policies", str, str2, str3), jsonMapper().writeValueAsBytes(entry.getKey()), ((Stat) entry.getValue()).getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully updated the message TTL on namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update the message TTL on namespace {}/{}/{} expected policy node version={} : concurrent modification", new Object[]{clientAppId(), str, str2, str3, Integer.valueOf(((Stat) entry.getValue()).getVersion())});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update the message TTL for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to update the message TTL on namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is not setup to split in bundles")})
    @Path("/{property}/{cluster}/{namespace}/bundles")
    @ApiOperation("Get the bundles split data.")
    public BundlesData getBundlesData(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        return getNamespacePolicies(str, str2, str3).bundles;
    }

    private BundlesData validateBundlesData(BundlesData bundlesData) {
        TreeSet treeSet = new TreeSet();
        Iterator it = bundlesData.getBoundaries().iterator();
        while (it.hasNext()) {
            treeSet.add(String.format("0x%08x", Long.decode((String) it.next())));
        }
        if (treeSet.size() != bundlesData.getBoundaries().size()) {
            log.debug("Input bundles included repeated partition points. Ignored.");
        }
        try {
            NamespaceBundleFactory.validateFullRange(treeSet);
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll(treeSet);
            return new BundlesData(newArrayList);
        } catch (IllegalArgumentException unused) {
            throw new RestException(Response.Status.BAD_REQUEST, "Input bundles do not cover the whole hash range. first:" + ((String) treeSet.first()) + ", last:" + ((String) treeSet.last()));
        }
    }

    private BundlesData getBundles(int i) {
        if (i <= 0 || i > MAX_BUNDLES) {
            throw new RestException(Response.Status.BAD_REQUEST, "Invalid number of bundles. Number of numbles has to be in the range of (0, 2^32].");
        }
        Long valueOf = Long.valueOf(MAX_BUNDLES);
        Long valueOf2 = Long.valueOf(valueOf.longValue() / i);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(String.format("0x%08x", 0L));
        Long l = valueOf2;
        for (int i2 = 0; i2 < i; i2++) {
            if (i2 != i - 1) {
                newArrayList.add(String.format("0x%08x", l));
            } else {
                newArrayList.add(String.format("0x%08x", Long.valueOf(valueOf.longValue() - 1)));
            }
            l = Long.valueOf(l.longValue() + valueOf2.longValue());
        }
        return new BundlesData(newArrayList);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is already unloaded or Namespace has bundles activated")})
    @Path("/{property}/{cluster}/{namespace}/unload")
    @ApiOperation(value = "Unload namespace", notes = "Unload an active namespace from the current broker serving it. Performing this operation will let the brokerremoves all producers, consumers, and connections using this namespace, and close all destinations (includingtheir persistent store). During that operation, the namespace is marked as tentatively unavailable until thebroker completes the unloading action. This operation requires strictly super user privileges, since it wouldresult in non-persistent message loss and unexpected connection closure to the clients.")
    @PUT
    public void unloadNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        log.info("[{}] Unloading namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3});
        validateSuperUserAccess();
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        List boundaries = namespacePolicies.bundles.getBoundaries();
        for (int i = 0; i < boundaries.size() - 1; i++) {
            try {
                pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespaceName.toString(), String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)));
            } catch (PulsarServerException | PulsarAdminException e) {
                log.error(String.format("[%s] Failed to unload namespace %s/%s/%s", clientAppId(), str, str2, str3), e);
                throw new RestException((Throwable) e);
            }
        }
        log.info("[{}] Successfully unloaded all the bundles in namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3});
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}/unload")
    @ApiOperation("Unload a namespace bundle")
    @PUT
    public void unloadNamespaceBundle(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("bundle") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        log.info("[{}] Unloading namespace bundle {}/{}/{}/{}", new Object[]{clientAppId(), str, str2, str3, str4});
        validateSuperUserAccess();
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validatePoliciesReadOnlyAccess();
        NamespaceBundle validateNamespaceBundleOwnership = validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str4, z, true);
        try {
            pulsar().getNamespaceService().unloadNamespaceBundle(validateNamespaceBundleOwnership);
            log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), validateNamespaceBundleOwnership.toString());
        } catch (Exception e) {
            log.error("[{}] Failed to unload namespace bundle {}/{}", new Object[]{clientAppId(), namespaceName.toString(), str4, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}/split")
    @ApiOperation("Split a namespace bundle")
    @PUT
    public void splitNamespaceBundle(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("bundle") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        log.info("[{}] Split namespace bundle {}/{}/{}/{}", new Object[]{clientAppId(), str, str2, str3, str4});
        validateSuperUserAccess();
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validatePoliciesReadOnlyAccess();
        NamespaceBundle validateNamespaceBundleOwnership = validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str4, z, true);
        try {
            pulsar().getNamespaceService().splitAndOwnBundle(validateNamespaceBundleOwnership).get();
            log.info("[{}] Successfully split namespace bundle {}", clientAppId(), validateNamespaceBundleOwnership.toString());
        } catch (Exception e) {
            log.error("[{}] Failed to split namespace bundle {}/{}", new Object[]{clientAppId(), namespaceName.toString(), str4, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/backlogQuotaMap")
    @ApiOperation("Get backlog quota map on a namespace.")
    public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        return getNamespacePolicies(str, str2, str3).backlog_quota_map;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota. Increase retention quota and retry request")})
    @Path("/{property}/{cluster}/{namespace}/backlogQuota")
    @ApiOperation(" Set a backlog quota for all the destinations on a namespace.")
    @POST
    public void setBacklogQuota(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        if (backlogQuotaType == null) {
            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
        }
        try {
            Stat stat = new Stat();
            String path = path("policies", str, str2, str3);
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path, (Watcher) null, stat), Policies.class);
            RetentionPolicies retentionPolicies = policies.retention_policies;
            if (retentionPolicies != null) {
                Policies policies2 = new Policies();
                policies2.backlog_quota_map.put(backlogQuotaType, backlogQuota);
                if (!checkQuotas(policies2, retentionPolicies)) {
                    log.warn("[{}] Failed to update backlog configuration for namespace {}/{}/{}: conflicts with retention quota", new Object[]{clientAppId(), str, str2, str3});
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Backlog Quota exceeds configured retention quota for namespace. Please increase retention quota and retry");
                }
            }
            policies.backlog_quota_map.put(backlogQuotaType, backlogQuota);
            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully updated backlog quota map: namespace={}/{}/{}, map={}", new Object[]{clientAppId(), str, str2, str3, jsonMapper().writeValueAsString(policies.backlog_quota_map)});
        } catch (RestException e) {
            throw e;
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to update backlog quota map for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{property}/{cluster}/{namespace}/backlogQuota")
    @DELETE
    @ApiOperation("Remove a backlog quota policy from a namespace.")
    public void removeBacklogQuota(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType) {
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        if (backlogQuotaType == null) {
            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
        }
        try {
            Stat stat = new Stat();
            String path = path("policies", str, str2, str3);
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path, (Watcher) null, stat), Policies.class);
            policies.backlog_quota_map.remove(backlogQuotaType);
            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully removed backlog namespace={}/{}/{}, quota={}", new Object[]{clientAppId(), str, str2, str3, backlogQuotaType});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update backlog quota map for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to update backlog quota map for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/retention")
    @ApiOperation("Get retention config on a namespace.")
    public RetentionPolicies getRetention(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        return namespacePolicies.retention_policies == null ? new RetentionPolicies(config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB()) : namespacePolicies.retention_policies;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota")})
    @Path("/{property}/{cluster}/{namespace}/retention")
    @ApiOperation(" Set retention configuration on a namespace.")
    @POST
    public void setRetention(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, RetentionPolicies retentionPolicies) {
        validatePoliciesReadOnlyAccess();
        try {
            Stat stat = new Stat();
            String path = path("policies", str, str2, str3);
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path, (Watcher) null, stat), Policies.class);
            if (!checkQuotas(policies, retentionPolicies)) {
                log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: conflicts with backlog quota", new Object[]{clientAppId(), str, str2, str3});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Retention Quota must exceed configured backlog quota for namespace.");
            }
            policies.retention_policies = retentionPolicies;
            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully updated retention configuration: namespace={}/{}/{}, map={}", new Object[]{clientAppId(), str, str2, str3, jsonMapper().writeValueAsString(policies.retention_policies)});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update retention configuration for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to update retention configuration for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        } catch (RestException e2) {
            throw e2;
        }
    }

    private boolean checkQuotas(Policies policies, RetentionPolicies retentionPolicies) {
        Map map = policies.backlog_quota_map;
        if (map.isEmpty() || retentionPolicies.getRetentionSizeInMB() == 0) {
            return true;
        }
        BacklogQuota backlogQuota = (BacklogQuota) map.get(BacklogQuota.BacklogQuotaType.destination_storage);
        if (backlogQuota == null) {
            backlogQuota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
        }
        return backlogQuota.getLimit() < (retentionPolicies.getRetentionSizeInMB() * 1024) * 1024;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{property}/{cluster}/{namespace}/persistence")
    @ApiOperation("Set the persistence configuration for all the destinations on a namespace.")
    @POST
    public void setPersistence(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, PersistencePolicies persistencePolicies) {
        validatePoliciesReadOnlyAccess();
        try {
            Stat stat = new Stat();
            String path = path("policies", str, str2, str3);
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path, (Watcher) null, stat), Policies.class);
            policies.persistence = persistencePolicies;
            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path("policies", str, str2, str3));
            log.info("[{}] Successfully updated persistence configuration: namespace={}/{}/{}, map={}", new Object[]{clientAppId(), str, str2, str3, jsonMapper().writeValueAsString(policies.persistence)});
        } catch (KeeperException.BadVersionException unused) {
            log.warn("[{}] Failed to update persistence configuration for namespace {}/{}/{}: concurrent modification", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to update persistence configuration for namespace {}/{}/{}: does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to update persistence configuration for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{property}/{cluster}/{namespace}/persistence")
    @ApiOperation("Get the persistence configuration for a namespace.")
    public PersistencePolicies getPersistence(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        return namespacePolicies.persistence == null ? new PersistencePolicies(config().getManagedLedgerDefaultEnsembleSize(), config().getManagedLedgerDefaultWriteQuorum(), config().getManagedLedgerDefaultAckQuorum(), 0.0d) : namespacePolicies.persistence;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/clearBacklog")
    @ApiOperation("Clear backlog for all destinations on a namespace.")
    @POST
    public void clearNamespaceBacklog(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        try {
            Exception exc = null;
            for (NamespaceBundle namespaceBundle : pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(namespaceName).getBundles()) {
                try {
                    if (pulsar().getNamespaceService().getOwner(namespaceBundle).isPresent()) {
                        pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklog(namespaceName.toString(), namespaceBundle.getBundleRange());
                    }
                } catch (Exception e) {
                    if (exc == null) {
                        exc = e;
                    }
                }
            }
            if (exc != null) {
                if (!(exc instanceof PulsarAdminException)) {
                    throw new RestException(exc.getCause());
                }
                throw new RestException((PulsarAdminException) exc);
            }
            log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), namespaceName.toString());
        } catch (Exception e2) {
            throw new RestException(e2);
        } catch (WebApplicationException e3) {
            throw e3;
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}/clearBacklog")
    @ApiOperation("Clear backlog for all destinations on a namespace bundle.")
    @POST
    public void clearNamespaceBundleBacklog(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("bundle") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str4, z, true);
        clearBacklog(namespaceName, str4, null);
        log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", new Object[]{clientAppId(), namespaceName.toString(), str4});
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/clearBacklog/{subscription}")
    @ApiOperation("Clear backlog for a given subscription on all destinations on a namespace.")
    @POST
    public void clearNamespaceBacklogForSubscription(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("subscription") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        try {
            Exception exc = null;
            for (NamespaceBundle namespaceBundle : pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(namespaceName).getBundles()) {
                try {
                    if (pulsar().getNamespaceService().getOwner(namespaceBundle).isPresent()) {
                        pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscription(namespaceName.toString(), namespaceBundle.getBundleRange(), str4);
                    }
                } catch (Exception e) {
                    if (exc == null) {
                        exc = e;
                    }
                }
            }
            if (exc != null) {
                if (!(exc instanceof PulsarAdminException)) {
                    throw new RestException(exc.getCause());
                }
                throw new RestException((PulsarAdminException) exc);
            }
            log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", new Object[]{clientAppId(), str4, namespaceName.toString()});
        } catch (WebApplicationException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new RestException(e3);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}/clearBacklog/{subscription}")
    @ApiOperation("Clear backlog for a given subscription on all destinations on a namespace bundle.")
    @POST
    public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("subscription") String str4, @PathParam("bundle") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str5, z, true);
        clearBacklog(namespaceName, str5, str4);
        log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", new Object[]{clientAppId(), str4, namespaceName.toString(), str5});
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/unsubscribe/{subscription}")
    @ApiOperation("Unsubscribes the given subscription on all destinations on a namespace.")
    @POST
    public void unsubscribeNamespace(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("subscription") String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        try {
            Exception exc = null;
            for (NamespaceBundle namespaceBundle : pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(namespaceName).getBundles()) {
                try {
                    if (pulsar().getNamespaceService().getOwner(namespaceBundle).isPresent()) {
                        pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(namespaceName.toString(), namespaceBundle.getBundleRange(), str4);
                    }
                } catch (Exception e) {
                    if (exc == null) {
                        exc = e;
                    }
                }
            }
            if (exc != null) {
                if (!(exc instanceof PulsarAdminException)) {
                    throw new RestException(exc.getCause());
                }
                throw new RestException((PulsarAdminException) exc);
            }
            log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", new Object[]{clientAppId(), str4, namespaceName.toString()});
        } catch (WebApplicationException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new RestException(e3);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{bundle}/unsubscribe/{subscription}")
    @ApiOperation("Unsubscribes the given subscription on all destinations on a namespace bundle.")
    @POST
    public void unsubscribeNamespaceBundle(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("subscription") String str4, @PathParam("bundle") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        validateAdminAccessOnProperty(str);
        Policies namespacePolicies = getNamespacePolicies(str, str2, str3);
        if (!str2.equals(GLOBAL_CLUSTER)) {
            validateClusterOwnership(str2);
            validateClusterForProperty(str, str2);
        }
        NamespaceName namespaceName = new NamespaceName(str, str2, str3);
        validateNamespaceBundleOwnership(namespaceName, namespacePolicies.bundles, str5, z, true);
        unsubscribe(namespaceName, str5, str4);
        log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", new Object[]{clientAppId(), str4, namespaceName.toString(), str5});
    }

    private void clearBacklog(NamespaceName namespaceName, String str, String str2) {
        try {
            List<Topic> allTopicsFromNamespaceBundle = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(namespaceName.toString(), String.valueOf(namespaceName.toString()) + "/" + str);
            ArrayList newArrayList = Lists.newArrayList();
            if (str2 != null) {
                if (str2.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
                    str2 = PersistentReplicator.getRemoteCluster(str2);
                }
                for (Topic topic : allTopicsFromNamespaceBundle) {
                    if (topic instanceof PersistentTopic) {
                        newArrayList.add(((PersistentTopic) topic).clearBacklog(str2));
                    }
                }
            } else {
                for (Topic topic2 : allTopicsFromNamespaceBundle) {
                    if (topic2 instanceof PersistentTopic) {
                        newArrayList.add(((PersistentTopic) topic2).clearBacklog());
                    }
                }
            }
            FutureUtil.waitForAll(newArrayList).get();
        } catch (Exception e) {
            log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", new Object[]{clientAppId(), namespaceName.toString(), str, str2, e});
            throw new RestException(e);
        }
    }

    private void unsubscribe(NamespaceName namespaceName, String str, String str2) {
        try {
            List<Topic> allTopicsFromNamespaceBundle = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(namespaceName.toString(), String.valueOf(namespaceName.toString()) + "/" + str);
            ArrayList newArrayList = Lists.newArrayList();
            if (str2.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
            }
            Iterator<Topic> it = allTopicsFromNamespaceBundle.iterator();
            while (it.hasNext()) {
                Subscription subscription = it.next().getSubscription(str2);
                if (subscription != null) {
                    newArrayList.add(subscription.delete());
                }
            }
            FutureUtil.waitForAll(newArrayList).get();
        } catch (Exception e) {
            log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", new Object[]{clientAppId(), str2, namespaceName.toString(), str, e});
            if (!(e.getCause() instanceof BrokerServiceException.SubscriptionBusyException)) {
                throw new RestException(e.getCause());
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
        } catch (RestException e2) {
            throw e2;
        }
    }
}
