/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v2;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/persistent")
@Produces(value={"application/json"})
@Api(value="/persistent", description="Persistent topic admin apis", tags={"persistent topic"})
public class PersistentTopics
extends PersistentTopicsBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

    @GET
    @Path(value="/{tenant}/{namespace}")
    @ApiOperation(value="Get the list of topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin or operate permission on the namespacee"), @ApiResponse(code=404, message="tenant/namespace/topic doesn't exit"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public void getList(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify the bundle name", required=false) @QueryParam(value="bundle") String bundle) {
        this.validateNamespaceName(tenant, namespace);
        ((CompletableFuture)this.internalGetListAsync(Optional.ofNullable(bundle)).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            if (PersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to get topic list {}", new Object[]{this.clientAppId(), this.namespaceName, ex});
            }
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/partitioned")
    @ApiOperation(value="Get the list of partitioned topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin or operate permission on the namespace"), @ApiResponse(code=404, message="tenant/namespace/topic doesn't exit"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public List<String> getPartitionedTopicList(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace) {
        this.validateNamespaceName(tenant, namespace);
        return this.internalGetPartitionedTopicList();
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/permissions")
    @ApiOperation(value="Get permissions on a topic.", notes="Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at thenamespace level combined (union) with any eventual specific permission set on the topic.")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="tenant/namespace/topic doesn't exit"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public Map<String, Set<AuthAction>> getPermissionsOnTopic(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.internalGetPermissionsOnTopic();
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/permissions/{role}")
    @ApiOperation(value="Grant a new permission to a role on a single topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="tenant/namespace/topic doesn't exit"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public void grantPermissionsOnTopic(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Client role to which grant permissions", required=true) @PathParam(value="role") String role, @ApiParam(value="Actions to be granted (produce,functions,consume)", allowableValues="produce,functions,consume") Set<AuthAction> actions) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalGrantPermissionsOnTopic(role, actions);
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/permissions/{role}")
    @ApiOperation(value="Revoke permissions on a topic.", notes="Revoke permissions to a role on a single topic. If the permission was not set at the topiclevel, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="tenant/namespace/topic doesn't exit"), @ApiResponse(code=412, message="Permissions are not set at the topic level"), @ApiResponse(code=500, message="Internal server error")})
    public void revokePermissionsOnTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Client role to which grant permissions", required=true) @PathParam(value="role") String role) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalRevokePermissionsOnTopic(asyncResponse, role);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant does not exist"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic already exist"), @ApiResponse(code=412, message="Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="The number of partitions for the topic", required=true, type="int", defaultValue="0") int numPartitions, @QueryParam(value="createLocalTopicOnly") @DefaultValue(value="false") boolean createLocalTopicOnly) {
        try {
            this.validateNamespaceName(tenant, namespace);
            this.validateGlobalNamespaceOwnership();
            this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
            this.validateTopicPolicyOperation(this.topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
            this.validateCreateTopic(this.topicName);
            this.internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @PUT
    @Consumes(value={"application/vnd.partitioned-topic-metadata+json"})
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant does not exist"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic already exist"), @ApiResponse(code=412, message="Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="The metadata for the topic", required=true, type="PartitionedTopicMetadata") PartitionedTopicMetadata metadata, @QueryParam(value="createLocalTopicOnly") @DefaultValue(value="false") boolean createLocalTopicOnly) {
        try {
            this.validateNamespaceName(tenant, namespace);
            this.validateGlobalNamespaceOwnership();
            this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
            this.validateTopicPolicyOperation(this.topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
            this.validateCreateTopic(this.topicName);
            this.internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly, metadata.properties);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}")
    @ApiOperation(value="Create a non-partitioned topic.", notes="This is the only REST endpoint from which non-partitioned topics could be created.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=409, message="Partitioned topic already exist"), @ApiResponse(code=412, message="Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createNonPartitionedTopic(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Key value pair properties for the topic metadata") Map<String, String> properties) {
        this.validateNamespaceName(tenant, namespace);
        this.validateGlobalNamespaceOwnership();
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateCreateTopic(this.topicName);
        this.internalCreateNonPartitionedTopic(authoritative, properties);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/offloadPolicies")
    @ApiOperation(value="Get offload policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.OFFLOAD, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetOffloadPolicies(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getOffloadPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/offloadPolicies")
    @ApiOperation(value="Set offload policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateTopicPolicyOperation(this.topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetOffloadPolicies(offloadPolicies, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setOffloadPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/offloadPolicies")
    @ApiOperation(value="Delete offload policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void removeOffloadPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetOffloadPolicies(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("removeOffloadPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @ApiOperation(value="Get max unacked messages per consumer config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @ApiOperation(value="Set max unacked messages per consumer config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Max unacked messages on consumer policies for the specified topic") Integer maxUnackedNum) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
    @ApiOperation(value="Delete max unacked messages per consumer config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void deleteMaxUnackedMessagesOnConsumer(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @ApiOperation(value="Get deduplicationSnapshotInterval config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal))).thenAccept(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            asyncResponse.resume((Object)topicPolicies.getDeduplicationSnapshotIntervalSeconds());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("getDeduplicationSnapshotInterval", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @ApiOperation(value="Set deduplicationSnapshotInterval config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Interval to take deduplication snapshot for the specified topic") Integer interval, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetDeduplicationSnapshotInterval(interval, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setDeduplicationSnapshotInterval", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
    @ApiOperation(value="Delete deduplicationSnapshotInterval config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void deleteDeduplicationSnapshotInterval(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetDeduplicationSnapshotInterval(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("deleteDeduplicationSnapshotInterval", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @ApiOperation(value="Get inactive topic policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetInactiveTopicPolicies(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getInactiveTopicPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @ApiOperation(value="Set inactive topic policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="inactive topic policies for the specified topic") InactiveTopicPolicies inactiveTopicPolicies) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setInactiveTopicPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
    @ApiOperation(value="Delete inactive topic policies on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void deleteInactiveTopicPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetInactiveTopicPolicies(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("deleteInactiveTopicPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @ApiOperation(value="Get max unacked messages per subscription config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @ApiOperation(value="Set max unacked messages per subscription config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Max unacked messages on subscription policies for the specified topic") Integer maxUnackedNum) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateTopicPolicyOperation(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
    @ApiOperation(value="Delete max unacked messages per subscription config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void deleteMaxUnackedMessagesOnSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateTopicPolicyOperation(this.topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetMaxUnackedMessagesOnSubscription(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/delayedDelivery")
    @ApiOperation(value="Get delayed delivery messages config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void getDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetDelayedDeliveryPolicies(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getDelayedDeliveryPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/delayedDelivery")
    @ApiOperation(value="Set delayed delivery messages config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void setDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validatePoliciesReadOnlyAccess();
        this.validateTopicPolicyOperation(this.topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetDelayedDeliveryPolicies(deliveryPolicies, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setDelayedDeliveryPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/delayedDelivery")
    @ApiOperation(value="Set delayed delivery messages config on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist")})
    public void deleteDelayedDeliveryPolicies(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validatePoliciesReadOnlyAccess();
        this.validateTopicPolicyOperation(this.topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetDelayedDeliveryPolicies(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("deleteDelayedDeliveryPolicies", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Increment partitions of an existing partitioned topic.", notes="It only increments partitions of existing non-global partitioned-topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant does not exist"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic does not exist"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error")})
    public void updatePartitionedTopic(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="updateLocalTopicOnly") @DefaultValue(value="false") boolean updateLocalTopicOnly, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="force") @DefaultValue(value="false") boolean force, @ApiParam(value="The number of partitions for the topic", required=true, type="int", defaultValue="0") int numPartitions) {
        this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
        this.validatePartitionedTopicMetadata();
        this.internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/createMissedPartitions")
    @ApiOperation(value="Create missed partitions of an existing partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant does not exist"), @ApiResponse(code=409, message="Partitioned topic does not exist"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error")})
    public void createMissedPartitions(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic) {
        try {
            this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
            this.internalCreateMissedPartitions(asyncResponse);
        }
        catch (Exception e) {
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error")})
    public void getPartitionedMetadata(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Is check configuration required to automatically create topic") @QueryParam(value="checkAllowAutoCreation") @DefaultValue(value="false") boolean checkAllowAutoCreation) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)this.internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            if (!PersistentTopics.isRedirectException(ex)) {
                log.error("[{}] Failed to get partitioned metadata topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/properties")
    @ApiOperation(value="Get topic properties.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Topic name is invalid"), @ApiResponse(code=500, message="Internal server error")})
    public void getProperties(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validatePersistentTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)this.internalGetPropertiesAsync(authoritative).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            if (PersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to get topic {} properties", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Delete a partitioned topic.", notes="It will also delete all the partitions of the topic if it exists.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Partitioned topic does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error")})
    public void deletePartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Stop all producer/consumer/replicator and delete topic forcefully", defaultValue="false", type="boolean") @QueryParam(value="force") @DefaultValue(value="false") boolean force, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Delete the topic's schema storage") @QueryParam(value="deleteSchema") @DefaultValue(value="false") boolean deleteSchema) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            if (this.topicName.isPartitioned()) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Partitioned Topic Name should not contain '-partition-'");
            }
            this.internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation(value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Topic name is not valid or can't find owner for topic"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalUnloadTopic(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}")
    @ApiOperation(value="Delete a topic.", notes="The topic cannot be deleted if delete is not forcefully and there's any active subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic has active producers/subscriptions"), @ApiResponse(code=500, message="Internal server error")})
    public void deleteTopic(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Stop all producer/consumer/replicator and delete topic forcefully", defaultValue="false", type="boolean") @QueryParam(value="force") @DefaultValue(value="false") boolean force, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Delete the topic's schema storage") @QueryParam(value="deleteSchema") @DefaultValue(value="false") boolean deleteSchema) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalDeleteTopic(authoritative, force, deleteSchema);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptions")
    @ApiOperation(value="Get the list of persistent subscriptions for a given topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getSubscriptions(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalGetSubscriptions(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/stats")
    @ApiOperation(value="Get the stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public TopicStats getStats(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="If return precise backlog or imprecise backlog") @QueryParam(value="getPreciseBacklog") @DefaultValue(value="false") boolean getPreciseBacklog, @ApiParam(value="If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @QueryParam(value="subscriptionBacklogSize") @DefaultValue(value="false") boolean subscriptionBacklogSize, @ApiParam(value="If return time of the earliest message in backlog") @QueryParam(value="getEarliestTimeInBacklog") @DefaultValue(value="false") boolean getEarliestTimeInBacklog) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public PersistentTopicInternalStats getInternalStats(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="metadata") @DefaultValue(value="false") boolean metadata) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.internalGetInternalStats(authoritative, metadata);
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/internal-info")
    @ApiOperation(value="Get the stored topic metadata.")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getManagedLedgerInfo(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @Suspended AsyncResponse asyncResponse) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalGetManagedLedgerInfo(asyncResponse, authoritative);
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/partitioned-stats")
    @ApiOperation(value="Get the stats for the partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getPartitionedStats(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Get per partition stats") @QueryParam(value="perPartition") @DefaultValue(value="true") boolean perPartition, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="If return precise backlog or imprecise backlog") @QueryParam(value="getPreciseBacklog") @DefaultValue(value="false") boolean getPreciseBacklog, @ApiParam(value="If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @QueryParam(value="subscriptionBacklogSize") @DefaultValue(value="false") boolean subscriptionBacklogSize, @ApiParam(value="If return the earliest time in backlog") @QueryParam(value="getEarliestTimeInBacklog") @DefaultValue(value="false") boolean getEarliestTimeInBacklog) {
        try {
            this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
            this.internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/partitioned-internalStats")
    @ApiOperation(hidden=true, value="Get the stats-internal for the partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getPartitionedStatsInternal(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalGetPartitionedStatsInternal(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}")
    @ApiOperation(value="Delete a subscription.", notes="The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Subscription has active consumers"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void deleteSubscription(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Subscription to be deleted") @PathParam(value="subName") String encodedSubName, @ApiParam(value="Disconnect and close all consumers and delete subscription forcefully", defaultValue="false", type="boolean") @QueryParam(value="force") @DefaultValue(value="false") boolean force, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.validateTopicOwnership(this.topicName, authoritative);
            this.internalDeleteSubscription(asyncResponse, Codec.decode((String)encodedSubName), authoritative, force);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/skip_all")
    @ApiOperation(value="Skip all messages on a topic subscription.", notes="Completely clears the backlog on the subscription.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Operation not allowed on non-persistent topic"), @ApiResponse(code=412, message="Can't find owner for topic"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void skipAllMessages(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Name of subscription") @PathParam(value="subName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalSkipAllMessages(asyncResponse, Codec.decode((String)encodedSubName), authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}")
    @ApiOperation(value="Skipping messages on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Skipping messages on a partitioned topic is not allowed"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void skipMessages(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Name of subscription") @PathParam(value="subName") String encodedSubName, @ApiParam(value="The number of messages to skip", defaultValue="0") @PathParam(value="numMessages") int numMessages, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalSkipMessages(asyncResponse, Codec.decode((String)encodedSubName), numMessages, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
    @ApiOperation(value="Expiry messages on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Subscription to be Expiry messages on") @PathParam(value="subName") String encodedSubName, @ApiParam(value="Expires beyond the specified number of seconds", defaultValue="0") @PathParam(value="expireTimeInSeconds") int expireTimeInSeconds, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalExpireMessagesByTimestamp(asyncResponse, Codec.decode((String)encodedSubName), expireTimeInSeconds, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
    @ApiOperation(value="Expiry messages on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void expireTopicMessages(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Subscription to be Expiry messages on") @PathParam(value="subName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(name="messageId", value="messageId to reset back to (ledgerId:entryId)") ResetCursorData resetCursorData) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalExpireMessagesByPosition(asyncResponse, Codec.decode((String)encodedSubName), authoritative, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/all_subscription/expireMessages/{expireTimeInSeconds}")
    @ApiOperation(value="Expiry messages on all subscriptions of topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Expiry messages on a non-persistent topic is not allowed"), @ApiResponse(code=412, message="Can't find owner for topic"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void expireMessagesForAllSubscriptions(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Expires beyond the specified number of seconds", defaultValue="0") @PathParam(value="expireTimeInSeconds") int expireTimeInSeconds, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalExpireMessagesForAllSubscriptions(asyncResponse, expireTimeInSeconds, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}")
    @ApiOperation(value="Create a subscription on the topic.", notes="Creates a subscription on the topic at the specified message id")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=400, message="Create subscription on non persistent topic is not supported"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist"), @ApiResponse(code=405, message="Not supported for partitioned topics"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createSubscription(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String topic, @ApiParam(value="Subscription to create position on", required=true) @PathParam(value="subscriptionName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(name="messageId", value="messageId where to create the subscription. It can be 'latest', 'earliest' or (ledgerId:entryId)", defaultValue="latest", allowableValues="latest, earliest, ledgerId:entryId") ResetCursorData resetCursorData, @ApiParam(value="Is replicated required to perform this operation") @QueryParam(value="replicated") boolean replicated) {
        try {
            this.validateTopicName(tenant, namespace, topic);
            if (!this.topicName.isPersistent()) {
                throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic can only be done through client");
            }
            Map subscriptionProperties = resetCursorData == null ? null : resetCursorData.getProperties();
            MessageIdImpl messageId = resetCursorData == null ? null : new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex());
            this.internalCreateSubscription(asyncResponse, Codec.decode((String)encodedSubName), messageId, authoritative, replicated, subscriptionProperties);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor/{timestamp}")
    @ApiOperation(value="Reset subscription to message position closest to absolute timestamp (in ms).", notes="It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist"), @ApiResponse(code=405, message="Method Not Allowed"), @ApiResponse(code=412, message="Failed to reset cursor on subscription or Unable to find position for timestamp specified"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void resetCursor(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Subscription to reset position on", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="the timestamp to reset back") @PathParam(value="timestamp") long timestamp, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)this.internalResetCursorAsync(Codec.decode((String)encodedSubName), timestamp, authoritative).thenAccept(__ -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            Object t = FutureUtil.unwrapCompletionException((Throwable)ex);
            if (!PersistentTopics.isRedirectException(t)) {
                log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, encodedSubName, timestamp, t});
            }
            if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                t = new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified: " + t.getMessage());
            } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                t = new RestException(Response.Status.PRECONDITION_FAILED, "Failed for Subscription Busy: " + t.getMessage());
            }
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, t);
            return null;
        });
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
    @ApiOperation(value="Replaces all the properties on the given subscription")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist"), @ApiResponse(code=405, message="Method Not Allowed"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void updateSubscriptionProperties(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Subscription to update", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="The new properties") Map<String, String> subscriptionProperties, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalUpdateSubscriptionProperties(asyncResponse, Codec.decode((String)encodedSubName), subscriptionProperties, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
    @ApiOperation(value="Reset subscription to message position closest to given position.", notes="It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist"), @ApiResponse(code=405, message="Not supported for partitioned topics"), @ApiResponse(code=412, message="Unable to find position for position specified"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void resetCursorOnPosition(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(name="subName", value="Subscription to reset position on", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(name="messageId", value="messageId to reset back to (ledgerId:entryId)") ResetCursorData resetCursorData) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalResetCursorOnPosition(asyncResponse, Codec.decode((String)encodedSubName), authoritative, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()), resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
        }
        catch (Exception e) {
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}")
    @ApiOperation(value="Peek nth message on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic, subscription or the message position does not exist"), @ApiResponse(code=405, message="Skipping messages on a non-persistent topic is not allowed"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public Response peekNthMessage(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(name="subName", value="Subscribed message expired", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="The number of messages (default 1)", defaultValue="1") @PathParam(value="messagePosition") int messagePosition, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.internalPeekNthMessage(Codec.decode((String)encodedSubName), messagePosition, authoritative);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/examinemessage")
    @ApiOperation(value="Examine a specific message on a topic by position relative to the earliest or the latest message.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic, the message position does not exist"), @ApiResponse(code=405, message="If given partitioned topic"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public Response examineMessage(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(name="initialPosition", value="Relative start position to examine message.It can be 'latest' or 'earliest'", defaultValue="latest", allowableValues="latest, earliest") @QueryParam(value="initialPosition") String initialPosition, @ApiParam(value="The position of messages (default 1)", defaultValue="1") @QueryParam(value="messagePosition") long messagePosition, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateTopicOperation(this.topicName, TopicOperation.PEEK_MESSAGES);
        return this.internalExamineMessage(initialPosition, messagePosition, authoritative);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
    @ApiOperation(value="Get message by its messageId.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic, subscription or the message position does not exist"), @ApiResponse(code=405, message="Skipping messages on a non-persistent topic is not allowed"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getMessageById(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="The ledger id", required=true) @PathParam(value="ledgerId") long ledgerId, @ApiParam(value="The entry id", required=true) @PathParam(value="entryId") long entryId, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)this.internalGetMessageById(ledgerId, entryId, authoritative).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            if (PersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", new Object[]{this.clientAppId(), ledgerId, entryId, this.topicName, ex});
            }
            PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
    @ApiOperation(value="Get message ID published at or just after this absolute timestamp (in ms).")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic is not non-partitioned and persistent"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getMessageIdByTimestamp(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Specify the timestamp", required=true) @PathParam(value="timestamp") long timestamp, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)this.internalGetMessageIdByTimestamp(timestamp, authoritative).thenAccept(messageId -> {
            if (messageId == null) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Message ID not found")));
            } else {
                asyncResponse.resume(messageId);
            }
        })).exceptionally(ex -> {
            if (PersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to get message ID by timestamp {} from {}", new Object[]{this.clientAppId(), timestamp, this.topicName, ex});
                PersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            }
            return null;
        });
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/backlog")
    @ApiOperation(value="Get estimated backlog for offline topic.")
    @ApiResponses(value={@ApiResponse(code=404, message="Namespace does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public PersistentOfflineTopicStats getBacklog(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateTopicOperation(this.topicName, TopicOperation.GET_BACKLOG_SIZE);
        return this.internalGetBacklog(authoritative);
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/backlogSize")
    @ApiOperation(value="Calculate backlog size by a message ID (in bytes).")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getBacklogSizeByMessageId(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, MessageIdImpl messageId) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalGetBacklogSizeByMessageId(asyncResponse, messageId, authoritative);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/backlogQuotaMap")
    @ApiOperation(value="Get backlog quota map on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic policy does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry")})
    public void getBacklogQuotaMap(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.BACKLOG, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetBacklogQuota(applied, isGlobal))).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getBacklogQuotaMap", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/backlogQuota")
    @ApiOperation(value="Set a backlog quota for a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=412, message="Specified backlog quota exceeds retention quota. Increase retention quota and retry request")})
    public void setBacklogQuota(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetBacklogQuota(backlogQuotaType, backlogQuota, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setBacklogQuota", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/backlogQuota")
    @ApiOperation(value="Remove a backlog quota policy from a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeBacklogQuota(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetBacklogQuota(backlogQuotaType, null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("removeBacklogQuota", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/replication")
    @ApiOperation(value="Get the replication clusters for a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, enable the topic level policy and retry")})
    public void getReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName))).thenAccept(op -> asyncResponse.resume((Object)op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
            if (applied) {
                return this.getNamespacePolicies((NamespaceName)this.namespaceName).replication_clusters;
            }
            return null;
        })))).exceptionally(ex -> {
            this.handleTopicPolicyException("getReplicationClusters", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/replication")
    @ApiOperation(value="Set the replication clusters for a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=412, message="Topic is not global or invalid cluster ids")})
    public void setReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="List of replication clusters", required=true) List<String> clusterIds) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetReplicationClusters(clusterIds))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setReplicationClusters", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/replication")
    @ApiOperation(value="Remove the replication clusters from a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeReplicationClusters(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalRemoveReplicationClusters())).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("removeReplicationClusters", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/messageTTL")
    @ApiOperation(value="Get message TTL in seconds for a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, enable the topic level policy and retry")})
    public void getMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.TTL, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal))).thenAccept(op -> asyncResponse.resume((Object)op.map(TopicPolicies::getMessageTTLInSeconds).orElseGet(() -> {
            if (applied) {
                Integer otherLevelTTL = this.getNamespacePolicies((NamespaceName)this.namespaceName).message_ttl_in_seconds;
                return otherLevelTTL == null ? this.pulsar().getConfiguration().getTtlDurationDefaultInSeconds() : otherLevelTTL.intValue();
            }
            return null;
        })))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMessageTTL", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/messageTTL")
    @ApiOperation(value="Set message TTL in seconds for a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Not authenticate to perform the request or policy is read only"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code=412, message="Invalid message TTL value")})
    public void setMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="TTL in seconds for the specified namespace", required=true) @QueryParam(value="messageTTL") Integer messageTTL, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.TTL, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMessageTTL(messageTTL, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setMessageTTL", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/messageTTL")
    @ApiOperation(value="Remove message TTL in seconds for a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Not authenticate to perform the request or policy is read only"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code=412, message="Invalid message TTL value")})
    public void removeMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.TTL, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMessageTTL(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMessageTTL", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @ApiOperation(value="Get deduplication configuration of a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry")})
    public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetDeduplication(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getDeduplication", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @ApiOperation(value="Set deduplication enabled on a topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry")})
    public void setDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="DeduplicationEnabled policies for the specified topic") Boolean enabled) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetDeduplication(enabled, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("setDeduplication", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/deduplicationEnabled")
    @ApiOperation(value="Remove deduplication configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetDeduplication(null, isGlobal))).thenRun(() -> asyncResponse.resume((Object)Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("removeDeduplication", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/retention")
    @ApiOperation(value="Get retention configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getRetention(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RETENTION, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetRetention(applied, isGlobal))).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getRetention", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/retention")
    @ApiOperation(value="Set retention configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Retention Quota must exceed backlog quota")})
    public void setRetention(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Retention policies for the specified namespace") RetentionPolicies retention) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RETENTION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetRetention(retention, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully updated retention: namespace={}, topic={}, retention={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)retention)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setRetention", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/retention")
    @ApiOperation(value="Remove retention configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Retention Quota must exceed backlog quota")})
    public void removeRetention(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RETENTION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveRetention(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove retention: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeRetention", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/persistence")
    @ApiOperation(value="Get configuration of persistence policies for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getPersistence(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.PERSISTENCE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetPersistence(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getPersistence", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/persistence")
    @ApiOperation(value="Set configuration of persistence policies for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=400, message="Invalid persistence policies")})
    public void setPersistence(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetPersistence(persistencePolicies, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully updated persistence policies: namespace={}, topic={}, persistencePolicies={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)persistencePolicies)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setPersistence", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/persistence")
    @ApiOperation(value="Remove configuration of persistence policies for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removePersistence(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemovePersistence(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removePersistence", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @ApiOperation(value="Get maxSubscriptionsPerTopic config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxSubscriptionsPerTopic(isGlobal))).thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxSubscriptions", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @ApiOperation(value="Set maxSubscriptionsPerTopic config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Invalid value of maxSubscriptionsPerTopic")})
    public void setMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}, maxSubscriptions={}, isGlobal={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), maxSubscriptionsPerTopic, isGlobal});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxSubscriptions", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
    @ApiOperation(value="Remove maxSubscriptionsPerTopic config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeMaxSubscriptionsPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxSubscriptionsPerTopic(null, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMaxSubscriptions", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @ApiOperation(value="Get replicatorDispatchRate config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetReplicatorDispatchRate(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getReplicatorDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @ApiOperation(value="Set replicatorDispatchRate config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Invalid value of replicatorDispatchRate")})
    public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetReplicatorDispatchRate(dispatchRate, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}, replicatorDispatchRate={}, isGlobal={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), dispatchRate, isGlobal});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setReplicatorDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
    @ApiOperation(value="Remove replicatorDispatchRate config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetReplicatorDispatchRate(null, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeReplicatorDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxProducers")
    @ApiOperation(value="Get maxProducers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxProducers(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxProducers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxProducers")
    @ApiOperation(value="Set maxProducers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Invalid value of maxProducers")})
    public void setMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="The max producers of the topic") int maxProducers) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxProducers(maxProducers, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), maxProducers});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxProducers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxProducers")
    @ApiOperation(value="Remove maxProducers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeMaxProducers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveMaxProducers(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove max producers: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMaxProducers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumers")
    @ApiOperation(value="Get maxConsumers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxConsumers(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxConsumers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumers")
    @ApiOperation(value="Set maxConsumers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Invalid value of maxConsumers")})
    public void setMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="The max consumers of the topic") int maxConsumers) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxConsumers(maxConsumers, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), maxConsumers});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxConsumers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumers")
    @ApiOperation(value="Remove maxConsumers config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeMaxConsumers(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveMaxConsumers(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove max consumers: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMaxConsumers", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxMessageSize")
    @ApiOperation(value="Get maxMessageSize config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateAdminAccessForTenantAsync(this.topicName.getTenant()).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxMessageSize(isGlobal))).thenAccept(policies -> asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxMessageSize", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxMessageSize")
    @ApiOperation(value="Set maxMessageSize config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification"), @ApiResponse(code=412, message="Invalid value of maxConsumers")})
    public void setMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="The max message size of the topic") int maxMessageSize) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateAdminAccessForTenantAsync(this.topicName.getTenant()).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxMessageSize(maxMessageSize, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}, isGlobal={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), maxMessageSize, isGlobal});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxMessageSize", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxMessageSize")
    @ApiOperation(value="Remove maxMessageSize config for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeMaxMessageSize(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateAdminAccessForTenantAsync(this.topicName.getTenant()).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxMessageSize(null, isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove max message size: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMaxMessageSize", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/terminate")
    @ApiOperation(value="Terminate a topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Termination of a partitioned topic is not allowed"), @ApiResponse(code=406, message="Need to provide a persistent topic name"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public MessageId terminate(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validatePersistentTopicName(tenant, namespace, encodedTopic);
        return this.internalTerminate(authoritative);
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/terminate/partitions")
    @ApiOperation(value="Terminate all partitioned topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Termination of a non-partitioned topic is not allowed"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void terminatePartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalTerminatePartitionedTopic(asyncResponse, authoritative);
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/compaction")
    @ApiOperation(value="Trigger a compaction operation on a topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Operation is not allowed on the persistent topic"), @ApiResponse(code=409, message="Compaction already running"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void compact(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalTriggerCompaction(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/compaction")
    @ApiOperation(value="Get the status of a compaction operation for a topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist, or compaction hasn't run"), @ApiResponse(code=405, message="Operation is not allowed on the persistent topic"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public LongRunningProcessStatus compactionStatus(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.internalCompactionStatus(authoritative);
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/offload")
    @ApiOperation(value="Offload a prefix of a topic to long term storage")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=400, message="Message ID is null"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Operation is not allowed on the persistent topic"), @ApiResponse(code=409, message="Offload already running"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void triggerOffload(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, MessageIdImpl messageId) {
        try {
            if (messageId == null) {
                throw new RestException(Response.Status.BAD_REQUEST, "messageId is null");
            }
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalTriggerOffload(asyncResponse, authoritative, messageId);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/offload")
    @ApiOperation(value="Offload a prefix of a topic to long term storage")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Operation is not allowed on the persistent topic"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void offloadStatus(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalOffloadStatus(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/lastMessageId")
    @ApiOperation(value="Return the last commit message id of topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant orsubscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Operation is not allowed on the persistent topic"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getLastMessageId(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalGetLastMessageId(asyncResponse, authoritative);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/dispatchRate")
    @ApiOperation(value="Get dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetDispatchRate(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/dispatchRate")
    @ApiOperation(value="Set message dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetDispatchRate(dispatchRate, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)dispatchRate)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/dispatchRate")
    @ApiOperation(value="Remove message dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveDispatchRate(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @ApiOperation(value="Get subscription message dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetSubscriptionDispatchRate(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getSubscriptionDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @ApiOperation(value="Set subscription message dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Subscription message dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)dispatchRate)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setSubscriptionDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
    @ApiOperation(value="Remove subscription message dispatch rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeSubscriptionDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveSubscriptionDispatchRate(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeSubscriptionDispatchRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/compactionThreshold")
    @ApiOperation(value="Get compaction threshold configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.COMPACTION, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetCompactionThreshold(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getCompactionThreshold", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/compactionThreshold")
    @ApiOperation(value="Set compaction threshold configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Dispatch rate for the specified topic") long compactionThreshold) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.COMPACTION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetCompactionThreshold(compactionThreshold, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)compactionThreshold)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setCompactionThreshold", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/compactionThreshold")
    @ApiOperation(value="Remove compaction threshold configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeCompactionThreshold(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.COMPACTION, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveCompactionThreshold(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeCompactionThreshold", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @ApiOperation(value="Get max consumers per subscription configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetMaxConsumersPerSubscription(isGlobal))).thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("getMaxConsumersPerSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @ApiOperation(value="Set max consumers per subscription configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic max consumers per subscription: tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)maxConsumersPerSubscription)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setMaxConsumersPerSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
    @ApiOperation(value="Remove max consumers per subscription configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeMaxConsumersPerSubscription(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveMaxConsumersPerSubscription(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic max consumers per subscription: tenant={}, namespace={}, topic={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeMaxConsumersPerSubscription", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/publishRate")
    @ApiOperation(value="Get publish rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetPublishRate(isGlobal))).thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("getPublishRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/publishRate")
    @ApiOperation(value="Set message publish rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setPublishRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Dispatch rate for the specified topic") PublishRate publishRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetPublishRate(publishRate, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}, publishRate={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), isGlobal, PersistentTopics.jsonMapper().writeValueAsString((Object)publishRate)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setPublishRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/publishRate")
    @ApiOperation(value="Remove message publish rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removePublishRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemovePublishRate(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), isGlobal});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removePublishRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @ApiOperation(value="Get is enable sub type fors specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetSubscriptionTypesEnabled(isGlobal))).thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get() : Response.noContent().build()))).exceptionally(ex -> {
            this.handleTopicPolicyException("getSubscriptionTypesEnabled", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @ApiOperation(value="Set is enable sub types for specified topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Enable sub types for the specified topic") Set<SubscriptionType> subscriptionTypesEnabled) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic is enabled sub types : tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), PersistentTopics.jsonMapper().writeValueAsString((Object)subscriptionTypesEnabled)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setSubscriptionTypesEnabled", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
    @ApiOperation(value="Remove subscription types enabled for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeSubscriptionTypesEnabled(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveSubscriptionTypesEnabled(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeSubscriptionTypesEnabled", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscribeRate")
    @ApiOperation(value="Get subscribe rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.READ).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalGetSubscribeRate(applied, isGlobal))).thenApply(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getSubscribeRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscribeRate")
    @ApiOperation(value="Set subscribe rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void setSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalSetSubscribeRate(subscribeRate, isGlobal))).thenRun(() -> {
            try {
                log.info("[{}] Successfully set topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={} subscribeRate={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), isGlobal, PersistentTopics.jsonMapper().writeValueAsString((Object)subscribeRate)});
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setSubscribeRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/subscribeRate")
    @ApiOperation(value="Remove subscribe rate configuration for specified topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=405, message="Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code=409, message="Concurrent modification")})
    public void removeSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="isGlobal") @DefaultValue(value="false") boolean isGlobal, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.RATE, PolicyOperation.WRITE).thenCompose(__ -> this.preValidation(authoritative))).thenCompose(__ -> this.internalRemoveSubscribeRate(isGlobal))).thenRun(() -> {
            log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), isGlobal});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeSubscribeRate", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/truncate")
    @ApiOperation(value="Truncate a topic.", notes="The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=500, message="Internal server error")})
    public void truncateTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalTruncateTopic(asyncResponse, authoritative);
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
    @ApiOperation(value="Enable or disable a replicated subscription on a topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant or subscriber is not authorized to access this operation"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist"), @ApiResponse(code=405, message="Operation not allowed on this topic"), @ApiResponse(code=412, message="Can't find owner for topic"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void setReplicatedSubscriptionStatus(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Name of subscription", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Whether to enable replicated subscription", required=true) boolean enabled) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalSetReplicatedSubscriptionStatus(asyncResponse, Codec.decode((String)encodedSubName), authoritative, enabled);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus")
    @ApiOperation(value="Get replicated subscription status on a topic.")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to administrate resources"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Can't find owner for topic"), @ApiResponse(code=500, message="Internal server error")})
    public void getReplicatedSubscriptionStatus(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Name of subscription", required=true) @PathParam(value="subName") String encodedSubName, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.internalGetReplicatedSubscriptionStatus(asyncResponse, Codec.decode((String)encodedSubName), authoritative);
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @ApiOperation(value="Get schema compatibility strategy on a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=405, message="Operation not allowed on persistent topic"), @ApiResponse(code=404, message="Topic does not exist")})
    public void getSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the cluster", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="applied") @DefaultValue(value="false") boolean applied, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalGetSchemaCompatibilityStrategy(applied))).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            this.handleTopicPolicyException("getSchemaCompatibilityStrategy", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @ApiOperation(value="Set schema compatibility strategy on a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=405, message="Operation not allowed on persistent topic"), @ApiResponse(code=404, message="Topic does not exist")})
    public void setSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy strategy) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetSchemaCompatibilityStrategy(strategy))).thenRun(() -> {
            log.info("[{}] Successfully set topic schema compatibility strategy: tenant={}, namespace={}, topic={}, schemaCompatibilityStrategy={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName(), strategy});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("setSchemaCompatibilityStrategy", (Throwable)ex, asyncResponse);
            return null;
        });
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
    @ApiOperation(value="Remove schema compatibility strategy on a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=405, message="Operation not allowed on persistent topic"), @ApiResponse(code=404, message="Topic does not exist")})
    public void removeSchemaCompatibilityStrategy(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy strategy) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)this.preValidation(authoritative).thenCompose(__ -> this.internalSetSchemaCompatibilityStrategy(null))).thenRun(() -> {
            log.info("[{}] Successfully remove topic schema compatibility strategy: tenant={}, namespace={}, topic={}", new Object[]{this.clientAppId(), tenant, namespace, this.topicName.getLocalName()});
            asyncResponse.resume((Object)Response.noContent().build());
        })).exceptionally(ex -> {
            this.handleTopicPolicyException("removeSchemaCompatibilityStrategy", (Throwable)ex, asyncResponse);
            return null;
        });
    }
}

