/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/persistent")
@Produces(value={"application/json"})
@Api(value="/persistent", description="Persistent topic admin apis", tags={"persistent topic"})
public class PersistentTopics
extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
    protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
    private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers((int)1, (int)21);

    @GET
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Get the list of destinations under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public List<String> getList(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        try {
            this.policiesCache().get(PersistentTopics.path("policies", property, cluster, namespace));
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to get topic list {}/{}/{}: Namespace does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic list {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        ArrayList destinations = Lists.newArrayList();
        try {
            String path = String.format("/managed-ledgers/%s/%s/%s/%s", property, cluster, namespace, this.domain());
            for (String destination : this.managedLedgerListCache().get(path)) {
                if (!this.domain().equals(DestinationDomain.persistent.toString())) continue;
                destinations.add(DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)Codec.decode((String)destination)).toString());
            }
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (Exception e) {
            log.error("[{}] Failed to get destination list for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        destinations.sort(null);
        return destinations;
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/partitioned")
    @ApiOperation(value="Get the list of partitioned topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public List<String> getPartitionedTopicList(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        this.validateAdminAccessOnProperty(property);
        try {
            this.policiesCache().get(PersistentTopics.path("policies", property, cluster, namespace));
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to get partitioned topic list {}/{}/{}: Namespace does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        List<Object> partitionedTopics = Lists.newArrayList();
        try {
            String partitionedTopicPath = PersistentTopics.path("partitioned-topics", property, cluster, namespace, this.domain());
            List<String> destinations = this.globalZk().getChildren(partitionedTopicPath, false);
            partitionedTopics = destinations.stream().map(s -> String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, Codec.decode((String)s))).collect(Collectors.toList());
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (Exception e) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        partitionedTopics.sort(null);
        return partitionedTopics;
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/permissions")
    @ApiOperation(value="Get permissions on a destination.", notes="Retrieve the effective permissions for a destination. These permissions are defined by the permissions set at thenamespace level combined (union) with any eventual specific permission set on the destination.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public Map<String, Set<AuthAction>> getPermissionsOnDestination(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination) {
        destination = Codec.decode((String)destination);
        this.validateAdminAccessOnProperty(property);
        String destinationUri = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination).toString();
        try {
            Policies policies = (Policies)this.policiesCache().get(PersistentTopics.path("policies", property, cluster, namespace)).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
            TreeMap permissions = Maps.newTreeMap();
            AuthPolicies auth = policies.auth_policies;
            for (String string : auth.namespace_auth.keySet()) {
                permissions.put(string, (Set)auth.namespace_auth.get(string));
            }
            if (auth.destination_auth.containsKey(destinationUri)) {
                for (Map.Entry entry : ((Map)auth.destination_auth.get(destinationUri)).entrySet()) {
                    String role = (String)entry.getKey();
                    Set destinationPermissions = (Set)entry.getValue();
                    if (!permissions.containsKey(role)) {
                        permissions.put(role, destinationPermissions);
                        continue;
                    }
                    Sets.SetView union = Sets.union((Set)((Set)permissions.get(role)), (Set)destinationPermissions);
                    permissions.put(role, union);
                }
            }
            return permissions;
        }
        catch (Exception e) {
            log.error("[{}] Failed to get permissions for destination {}", new Object[]{this.clientAppId(), destinationUri, e});
            throw new RestException(e);
        }
    }

    protected void validateAdminAndClientPermission(DestinationName destination) {
        try {
            this.validateAdminAccessOnProperty(destination.getProperty());
        }
        catch (Exception exception) {
            try {
                PersistentTopics.checkAuthorization(this.pulsar(), destination, this.clientAppId());
            }
            catch (RestException re) {
                throw re;
            }
            catch (Exception e) {
                log.warn("Unexpected error while authorizing request. destination={}, role={}. Error: {}", new Object[]{destination, this.clientAppId(), e.getMessage(), e});
                throw new RestException(e);
            }
        }
    }

    protected void validateAdminOperationOnDestination(DestinationName fqdn, boolean authoritative) {
        this.validateAdminAccessOnProperty(fqdn.getProperty());
        this.validateDestinationOwnership(fqdn, authoritative);
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/permissions/{role}")
    @ApiOperation(value="Grant a new permission to a role on a single destination.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist"), @ApiResponse(code=409, message="Concurrent modification")})
    public void grantPermissionsOnDestination(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="role") String role, Set<AuthAction> actions) {
        destination = Codec.decode((String)destination);
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        String destinationUri = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination).toString();
        try {
            Stat nodeStat = new Stat();
            byte[] content = this.globalZk().getData(PersistentTopics.path("policies", property, cluster, namespace), null, nodeStat);
            Policies policies = (Policies)PersistentTopics.jsonMapper().readValue(content, Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(destinationUri)) {
                policies.auth_policies.destination_auth.put(destinationUri, new TreeMap());
            }
            ((Map)policies.auth_policies.destination_auth.get(destinationUri)).put(role, actions);
            this.globalZk().setData(PersistentTopics.path("policies", property, cluster, namespace), PersistentTopics.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(PersistentTopics.path("policies", property, cluster, namespace));
            log.info("[{}] Successfully granted access for role {}: {} - destination {}", new Object[]{this.clientAppId(), role, actions, destinationUri});
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to grant permissions on destination {}: Namespace does not exist", (Object)this.clientAppId(), (Object)destinationUri);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to grant permissions for destination {}", new Object[]{this.clientAppId(), destinationUri, e});
            throw new RestException(e);
        }
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/permissions/{role}")
    @ApiOperation(value="Revoke permissions on a destination.", notes="Revoke permissions to a role on a single destination. If the permission was not set at the destinationlevel, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist"), @ApiResponse(code=412, message="Permissions are not set at the destination level")})
    public void revokePermissionsOnDestination(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="role") String role) {
        Policies policies;
        destination = Codec.decode((String)destination);
        this.validateAdminAccessOnProperty(property);
        this.validatePoliciesReadOnlyAccess();
        String destinationUri = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination).toString();
        Stat nodeStat = new Stat();
        try {
            byte[] content = this.globalZk().getData(PersistentTopics.path("policies", property, cluster, namespace), null, nodeStat);
            policies = (Policies)PersistentTopics.jsonMapper().readValue(content, Policies.class);
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to revoke permissions on destination {}: Namespace does not exist", (Object)this.clientAppId(), (Object)destinationUri);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for destination {}", new Object[]{this.clientAppId(), destinationUri, e});
            throw new RestException(e);
        }
        if (!policies.auth_policies.destination_auth.containsKey(destinationUri) || !((Map)policies.auth_policies.destination_auth.get(destinationUri)).containsKey(role)) {
            log.warn("[{}] Failed to revoke permission from role {} on destination: Not set at destination level", new Object[]{this.clientAppId(), role, destinationUri});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Permissions are not set at the destination level");
        }
        ((Map)policies.auth_policies.destination_auth.get(destinationUri)).remove(role);
        try {
            String namespacePath = PersistentTopics.path("policies", property, cluster, namespace);
            this.globalZk().setData(namespacePath, PersistentTopics.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(namespacePath);
            this.globalZkCache().invalidate(namespacePath);
            log.info("[{}] Successfully revoke access for role {} - destination {}", new Object[]{this.clientAppId(), role, destinationUri});
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for destination {}", new Object[]{this.clientAppId(), destinationUri, e});
            throw new RestException(e);
        }
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=409, message="Partitioned topic already exist")})
    public void createPartitionedTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, int numPartitions, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAccessOnProperty(dn.getProperty());
        if (numPartitions <= 1) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
        }
        try {
            String path = PersistentTopics.path("partitioned-topics", property, cluster, namespace, this.domain(), dn.getEncodedLocalName());
            byte[] data = PersistentTopics.jsonMapper().writeValueAsBytes((Object)new PartitionedTopicMetadata(numPartitions));
            this.zkCreateOptimistic(path, data);
            Thread.sleep(1000L);
            log.info("[{}] Successfully created partitioned topic {}", (Object)this.clientAppId(), (Object)dn);
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
            log.warn("[{}] Failed to create already existing partitioned topic {}", (Object)this.clientAppId(), (Object)dn);
            throw new RestException(Response.Status.CONFLICT, "Partitioned topic already exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), dn, e});
            throw new RestException(e);
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Increment partitons of an existing partitioned topic.", notes="It only increments partitions of existing non-global partitioned-topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=409, message="Partitioned topic does not exist")})
    public void updatePartitionedTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, int numPartitions) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAccessOnProperty(dn.getProperty());
        if (dn.isGlobal()) {
            log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", (Object)this.clientAppId(), (Object)dn);
            throw new RestException(Response.Status.FORBIDDEN, "Update forbidden on global namespace");
        }
        if (numPartitions <= 1) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
        }
        try {
            this.updatePartitionedTopic(dn, numPartitions).get();
        }
        catch (Exception e) {
            if (e.getCause() instanceof RestException) {
                throw (RestException)((Object)e.getCause());
            }
            log.error("[{}] Failed to update partitioned topic {}", new Object[]{this.clientAppId(), dn, e.getCause()});
            throw new RestException(e.getCause());
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        PartitionedTopicMetadata metadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (metadata.partitions > 1) {
            this.validateClientVersion();
        }
        return metadata;
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Delete a partitioned topic.", notes="It will also delete all the partitions of the topic if it exists.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Partitioned topic does not exist")})
    public void deletePartitionedTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAccessOnProperty(dn.getProperty());
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        int numPartitions = partitionMetadata.partitions;
        if (numPartitions > 0) {
            CompletableFuture future = new CompletableFuture();
            AtomicInteger count = new AtomicInteger(numPartitions);
            try {
                int i = 0;
                while (i < numPartitions) {
                    DestinationName dn_partition = dn.getPartition(i);
                    this.pulsar().getAdminClient().persistentTopics().deleteAsync(dn_partition.toString()).whenComplete((r, ex) -> {
                        if (ex != null) {
                            if (!(ex instanceof PulsarAdminException.NotFoundException)) {
                                future.completeExceptionally((Throwable)ex);
                                log.error("[{}] Failed to delete partition {}", new Object[]{this.clientAppId(), dn_partition, ex});
                                return;
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Partition not found: {}", (Object)this.clientAppId(), (Object)dn_partition);
                            }
                        } else {
                            log.info("[{}] Deleted partition {}", (Object)this.clientAppId(), (Object)dn_partition);
                        }
                        if (count.decrementAndGet() == 0) {
                            future.complete(null);
                        }
                    });
                    ++i;
                }
                future.get();
            }
            catch (Exception e) {
                Throwable t = e.getCause();
                if (t instanceof PulsarAdminException.PreconditionFailedException) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
                }
                throw new RestException(t);
            }
        }
        String path = PersistentTopics.path("partitioned-topics", property, cluster, namespace, this.domain(), dn.getEncodedLocalName());
        try {
            this.globalZk().delete(path, -1);
            this.globalZkCache().invalidate(path);
            Thread.sleep(1000L);
            log.info("[{}] Deleted partitioned topic {}", (Object)this.clientAppId(), (Object)dn);
        }
        catch (KeeperException.NoNodeException noNodeException) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned topic does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to delete partitioned topic {}", new Object[]{this.clientAppId(), dn, e});
            throw new RestException(e);
        }
    }

    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/unload")
    @ApiOperation(value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public void unloadTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        log.info("[{}] Unloading topic {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, destination});
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        this.unloadTopic(dn, authoritative);
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/{destination}")
    @ApiOperation(value="Delete a topic.", notes="The topic cannot be deleted if there's any active subscription or producer connected to the it.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Topic has active producers/subscriptions")})
    public void deleteTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        if (dn.isGlobal()) {
            log.error("[{}] Delete topic is forbidden on global namespace {}", (Object)this.clientAppId(), (Object)dn);
            throw new RestException(Response.Status.FORBIDDEN, "Delete forbidden on global namespace");
        }
        try {
            topic.delete().get();
            log.info("[{}] Successfully removed topic {}", (Object)this.clientAppId(), (Object)dn);
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            log.error("[{}] Failed to get delete topic {}", new Object[]{this.clientAppId(), dn, t});
            if (t instanceof BrokerServiceException.TopicBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
            }
            throw new RestException(t);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscriptions")
    @ApiOperation(value="Get the list of persistent subscriptions for a given topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public List<String> getSubscriptions(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        ArrayList subscriptions = Lists.newArrayList();
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            try {
                subscriptions.addAll(this.pulsar().getAdminClient().persistentTopics().getSubscriptions(dn.getPartition(0).toString()));
            }
            catch (Exception e) {
                throw new RestException(e);
            }
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        try {
            topic.getSubscriptions().forEach((subName, sub) -> {
                boolean bl = subscriptions.add(subName);
            });
        }
        catch (Exception e) {
            log.error("[{}] Failed to get list of subscriptions for {}", (Object)this.clientAppId(), (Object)dn);
            throw new RestException(e);
        }
        return subscriptions;
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/stats")
    @ApiOperation(value="Get the stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public PersistentTopicStats getStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAndClientPermission(dn);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        this.validateDestinationOwnership(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        return topic.getStats();
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/internalStats")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public PersistentTopicInternalStats getInternalStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAndClientPermission(dn);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        this.validateDestinationOwnership(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        return topic.getInternalStats();
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/internal-info")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public void getManagedLedgerInfo(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, final @Suspended AsyncResponse asyncResponse) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAccessOnProperty(dn.getProperty());
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        String managedLedger = dn.getPersistenceNamingEncoding();
        this.pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new AsyncCallbacks.ManagedLedgerInfoCallback(){

            public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
                asyncResponse.resume(output -> PersistentTopics.jsonMapper().writer().writeValue(output, (Object)info));
            }

            public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
                asyncResponse.resume((Throwable)exception);
            }
        }, null);
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/partitioned-stats")
    @ApiOperation(value="Get the stats for the partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public PartitionedTopicStats getPartitionedStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
        try {
            int i = 0;
            while (i < partitionMetadata.partitions) {
                PersistentTopicStats partitionStats = this.pulsar().getAdminClient().persistentTopics().getStats(dn.getPartition(i).toString());
                stats.add(partitionStats);
                stats.partitions.put(dn.getPartition(i).toString(), partitionStats);
                ++i;
            }
        }
        catch (Exception e) {
            throw new RestException(e);
        }
        return stats;
    }

    @DELETE
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}")
    @ApiOperation(value="Delete a subscription.", notes="There should not be any active consumers on the subscription.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Subscription has active consumers")})
    public void deleteSubscription(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            try {
                int i = 0;
                while (i < partitionMetadata.partitions) {
                    this.pulsar().getAdminClient().persistentTopics().deleteSubscription(dn.getPartition(i).toString(), subName);
                    ++i;
                }
            }
            catch (Exception e) {
                if (e instanceof PulsarAdminException.NotFoundException) {
                    throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
                }
                if (e instanceof PulsarAdminException.PreconditionFailedException) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
                }
                log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), dn, subName, e});
                throw new RestException(e);
            }
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        try {
            Subscription sub = topic.getSubscription(subName);
            Preconditions.checkNotNull((Object)sub);
            sub.delete().get();
            log.info("[{}][{}] Deleted subscription {}", new Object[]{this.clientAppId(), dn, subName});
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            if (e instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
            }
            log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), dn, subName, e});
            throw new RestException(t);
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip_all")
    @ApiOperation(value="Skip all messages on a topic subscription.", notes="Completely clears the backlog on the subscription.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=405, message="Operation not allowed on non-persistent topic"), @ApiResponse(code=404, message="Topic or subscription does not exist")})
    public void skipAllMessages(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            try {
                int i = 0;
                while (i < partitionMetadata.partitions) {
                    this.pulsar().getAdminClient().persistentTopics().skipAllMessages(dn.getPartition(i).toString(), subName);
                    ++i;
                }
            }
            catch (Exception e) {
                throw new RestException(e);
            }
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
        try {
            if (subName.startsWith(topic.replicatorPrefix)) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                Preconditions.checkNotNull((Object)repl);
                repl.clearBacklog().get();
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.clearBacklog().get();
            }
            log.info("[{}] Cleared backlog on {} {}", new Object[]{this.clientAppId(), dn, subName});
        }
        catch (NullPointerException nullPointerException) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), dn, subName, exception});
            throw new RestException(exception);
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip/{numMessages}")
    @ApiOperation(value="Skip messages on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist")})
    public void skipMessages(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @PathParam(value="numMessages") int numMessages, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
        try {
            if (subName.startsWith(topic.replicatorPrefix)) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                Preconditions.checkNotNull((Object)repl);
                repl.skipMessages(numMessages).get();
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.skipMessages(numMessages).get();
            }
            log.info("[{}] Skipped {} messages on {} {}", new Object[]{this.clientAppId(), numMessages, dn, subName});
        }
        catch (NullPointerException nullPointerException) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to skip {} messages {} {}", new Object[]{this.clientAppId(), numMessages, dn, subName, exception});
            throw new RestException(exception);
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
    @ApiOperation(value="Expire messages on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist")})
    public void expireTopicMessages(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @PathParam(value="expireTimeInSeconds") int expireTimeInSeconds, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        this.expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative);
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/all_subscription/expireMessages/{expireTimeInSeconds}")
    @ApiOperation(value="Expire messages on all subscriptions of topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic or subscription does not exist")})
    public void expireMessagesForAllSubscriptions(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destinationName, @PathParam(value="expireTimeInSeconds") int expireTimeInSeconds, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        String destination = Codec.decode((String)destinationName);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            try {
                int i = 0;
                while (i < partitionMetadata.partitions) {
                    this.pulsar().getAdminClient().persistentTopics().expireMessagesForAllSubscriptions(dn.getPartition(i).toString(), (long)expireTimeInSeconds);
                    ++i;
                }
            }
            catch (Exception e) {
                log.error("[{}] Failed to expire messages up to {} on {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, dn, e});
                throw new RestException(e);
            }
        } else {
            this.validateAdminOperationOnDestination(dn, authoritative);
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
            topic.getReplicators().forEach((subName, replicator) -> this.expireMessages(property, cluster, namespace, destination, (String)subName, expireTimeInSeconds, authoritative));
            topic.getSubscriptions().forEach((subName, subscriber) -> this.expireMessages(property, cluster, namespace, destination, (String)subName, expireTimeInSeconds, authoritative));
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor/{timestamp}")
    @ApiOperation(value="Reset subscription to message position closest to absolute timestamp (in ms).", notes="It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist")})
    public void resetCursor(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @PathParam(value="timestamp") long timestamp, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            int numParts = partitionMetadata.partitions;
            int numPartException = 0;
            PulsarAdminException.PreconditionFailedException partitionException = null;
            try {
                int i = 0;
                while (i < numParts) {
                    this.pulsar().getAdminClient().persistentTopics().resetCursor(dn.getPartition(i).toString(), subName, timestamp);
                    ++i;
                }
            }
            catch (PulsarAdminException.PreconditionFailedException pfe) {
                ++numPartException;
                partitionException = pfe;
            }
            catch (Exception e) {
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), dn, subName, timestamp, e});
                throw new RestException(e);
            }
            if (numPartException == numParts) {
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), dn, subName, timestamp, partitionException});
                throw new RestException(Response.Status.PRECONDITION_FAILED, partitionException.getMessage());
            }
            if (numPartException > 0) {
                log.warn("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", new Object[]{this.clientAppId(), destination, subName, timestamp, partitionException});
            }
        } else {
            this.validateAdminOperationOnDestination(dn, authoritative);
            log.info("[{}][{}] received reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), destination, subName, timestamp});
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
            if (topic == null) {
                throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
            }
            try {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.resetCursor(timestamp).get();
                log.info("[{}][{}] reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), dn, subName, timestamp});
            }
            catch (Exception e) {
                Throwable t = e.getCause();
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), dn, subName, timestamp, e});
                if (e instanceof NullPointerException) {
                    throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
                }
                if (e instanceof BrokerServiceException.NotAllowedException) {
                    throw new RestException(Response.Status.METHOD_NOT_ALLOWED, e.getMessage());
                }
                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified -" + t.getMessage());
                }
                throw new RestException(e);
            }
        }
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor")
    @ApiOperation(value="Reset subscription to message position closest to given position.", notes="It fence cursor and disconnects all active consumers before reseting cursor.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic/Subscription does not exist"), @ApiResponse(code=405, message="Not supported for partitioned topics")})
    public void resetCursorOnPosition(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, MessageIdImpl messageId) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        log.info("[{}][{}] received reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), destination, subName, messageId});
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            log.warn("[{}] Not supported operation on partitioned-topic {} {}", new Object[]{this.clientAppId(), dn, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic");
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
        if (topic == null) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        try {
            PersistentSubscription sub = topic.getSubscription(subName);
            Preconditions.checkNotNull((Object)sub);
            sub.resetCursor((Position)PositionImpl.get((long)messageId.getLedgerId(), (long)messageId.getEntryId())).get();
            log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), dn, subName, messageId});
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), dn, subName, messageId, e});
            if (e instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage());
            }
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}")
    @ApiOperation(value="Peek nth message on a topic subscription.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic, subscription or the message position does not exist")})
    public Response peekNthMessage(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @PathParam(value="subName") String subName, @PathParam(value="messagePosition") int messagePosition, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        if (!(this.getTopicReference(dn) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{this.clientAppId(), dn, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
        PersistentReplicator repl = null;
        PersistentSubscription sub = null;
        Entry entry = null;
        if (subName.startsWith(topic.replicatorPrefix)) {
            repl = this.getReplicatorReference(subName, topic);
        } else {
            sub = (PersistentSubscription)this.getSubscriptionReference(subName, topic);
        }
        try {
            entry = subName.startsWith(topic.replicatorPrefix) ? repl.peekNthMessage(messagePosition).get() : sub.peekNthMessage(messagePosition).get();
            Preconditions.checkNotNull((Object)entry);
            PositionImpl pos = (PositionImpl)entry.getPosition();
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)metadataAndPayload);
            Response.ResponseBuilder responseBuilder = Response.ok();
            responseBuilder.header("X-Pulsar-Message-ID", (Object)pos.toString());
            for (PulsarApi.KeyValue keyValue : metadata.getPropertiesList()) {
                responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), (Object)keyValue.getValue());
            }
            if (metadata.hasPublishTime()) {
                responseBuilder.header("X-Pulsar-publish-time", (Object)DateFormatter.format((long)metadata.getPublishTime()));
            }
            if (metadata.hasEventTime()) {
                responseBuilder.header("X-Pulsar-event-time", (Object)DateFormatter.format((long)metadata.getEventTime()));
            }
            if (metadata.hasNumMessagesInBatch()) {
                responseBuilder.header("X-Pulsar-num-batch-message", (Object)metadata.getNumMessagesInBatch());
            }
            CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((PulsarApi.CompressionType)metadata.getCompression());
            ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
            final ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), uncompressedPayload.readableBytes());
            data.writeBytes(uncompressedPayload);
            uncompressedPayload.release();
            StreamingOutput stream = new StreamingOutput(){

                public void write(OutputStream output) throws IOException, WebApplicationException {
                    output.write(data.array(), data.arrayOffset(), data.readableBytes());
                    data.release();
                }
            };
            Response response = responseBuilder.entity((Object)stream).build();
            return response;
        }
        catch (NullPointerException nullPointerException) {
            throw new RestException(Response.Status.NOT_FOUND, "Message not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to get message at position {} from {} {}", new Object[]{this.clientAppId(), messagePosition, dn, subName, exception});
            throw new RestException(exception);
        }
        finally {
            if (entry != null) {
                entry.release();
            }
        }
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/backlog")
    @ApiOperation(value="Get estimated backlog for offline topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace does not exist")})
    public PersistentOfflineTopicStats getBacklog(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        this.validateAdminAccessOnProperty(property);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        try {
            this.policiesCache().get(PersistentTopics.path("policies", property, cluster, namespace));
        }
        catch (KeeperException.NoNodeException noNodeException) {
            log.warn("[{}] Failed to get topic backlog {}/{}/{}: Namespace does not exist", new Object[]{this.clientAppId(), property, cluster, namespace});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic backlog {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
            throw new RestException(e);
        }
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        PersistentOfflineTopicStats offlineTopicStats = null;
        try {
            long elapsedMs;
            offlineTopicStats = this.pulsar().getBrokerService().getOfflineTopicStat(dn);
            if (offlineTopicStats != null && TimeUnit.MINUTES.convert(elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime(), TimeUnit.MILLISECONDS) < 10L) {
                return offlineTopicStats;
            }
            ManagedLedgerConfig config = this.pulsar().getBrokerService().getManagedLedgerConfig(dn).get();
            ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(), this.pulsar().getAdvertisedAddress(), false);
            offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl)this.pulsar().getManagedLedgerFactory(), dn);
            this.pulsar().getBrokerService().cacheOfflineTopicStats(dn, offlineTopicStats);
        }
        catch (Exception exception) {
            throw new RestException(exception);
        }
        return offlineTopicStats;
    }

    @POST
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/terminate")
    @ApiOperation(value="Terminate a topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=405, message="Operation not allowed on non-persistent topic"), @ApiResponse(code=404, message="Topic does not exist")})
    public MessageId terminate(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        try {
            return ((PersistentTopic)topic).terminate().get();
        }
        catch (Exception exception) {
            log.error("[{}] Failed to terminated topic {}", new Object[]{this.clientAppId(), dn, exception});
            throw new RestException(exception);
        }
    }

    public void expireMessages(String property, String cluster, String namespace, String destination, String subName, int expireTimeInSeconds, boolean authoritative) {
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(new NamespaceName(property, cluster, namespace));
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
        if (partitionMetadata.partitions > 0) {
            try {
                int i = 0;
                while (i < partitionMetadata.partitions) {
                    this.pulsar().getAdminClient().persistentTopics().expireMessages(dn.getPartition(i).toString(), subName, (long)expireTimeInSeconds);
                    ++i;
                }
            }
            catch (Exception e) {
                throw new RestException(e);
            }
        }
        this.validateAdminOperationOnDestination(dn, authoritative);
        if (!(this.getTopicReference(dn) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{this.clientAppId(), dn, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(dn);
        try {
            if (subName.startsWith(topic.replicatorPrefix)) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                Preconditions.checkNotNull((Object)repl);
                repl.expireMessages(expireTimeInSeconds);
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.expireMessages(expireTimeInSeconds);
            }
            log.info("[{}] Message expire started up to {} on {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, dn, subName});
        }
        catch (NullPointerException nullPointerException) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, dn, subName, exception});
            throw new RestException(exception);
        }
    }

    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, DestinationName dn) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            try {
                PersistentTopics.checkAuthorization(pulsar, dn, clientAppId);
            }
            catch (RestException restException) {
                try {
                    PersistentTopics.validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
                }
                catch (RestException authException) {
                    log.warn("Failed to authorize {} on cluster {}", (Object)clientAppId, (Object)dn.toString());
                    throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s", clientAppId, dn.toString(), authException.getMessage()));
                }
            }
            catch (Exception ex2) {
                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", new Object[]{clientAppId, dn.toString(), ex2.getMessage(), ex2});
                throw ex2;
            }
            String path = PersistentTopics.path("partitioned-topics", dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
            ((CompletableFuture)((CompletableFuture)PersistentTopics.checkLocalOrGetPeerReplicationCluster(pulsar, dn.getNamespaceObject()).thenCompose(res -> PersistentTopics.fetchPartitionedTopicMetadataAsync(pulsar, path))).thenAccept(metadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{clientAppId, dn, metadata.partitions});
                }
                metadataFuture.complete((PartitionedTopicMetadata)metadata);
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally(ex.getCause());
                return null;
            });
        }
        catch (Exception ex3) {
            metadataFuture.completeExceptionally(ex3);
        }
        return metadataFuture;
    }

    private Topic getTopicReference(DestinationName dn) {
        try {
            Topic topic = this.pulsar().getBrokerService().getTopicReference(dn.toString());
            Preconditions.checkNotNull((Object)topic);
            return topic;
        }
        catch (Exception exception) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
    }

    private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
        try {
            PersistentSubscription sub = topic.getSubscription(subName);
            return (Subscription)Preconditions.checkNotNull((Object)sub);
        }
        catch (Exception exception) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
    }

    private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
        try {
            String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
            PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
            return (PersistentReplicator)Preconditions.checkNotNull((Object)repl);
        }
        catch (Exception exception) {
            throw new RestException(Response.Status.NOT_FOUND, "Replicator not found");
        }
    }

    private CompletableFuture<Void> updatePartitionedTopic(DestinationName dn, int numPartitions) {
        String path = PersistentTopics.path("partitioned-topics", dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), this.domain(), dn.getEncodedLocalName());
        CompletableFuture<Void> updatePartition = new CompletableFuture<Void>();
        ((CompletableFuture)this.createSubscriptions(dn, numPartitions).thenAccept(res -> {
            try {
                byte[] data = PersistentTopics.jsonMapper().writeValueAsBytes((Object)new PartitionedTopicMetadata(numPartitions));
                this.globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        updatePartition.complete(null);
                    } else {
                        updatePartition.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), "failed to create update partitions"));
                    }
                }, null);
            }
            catch (Exception e) {
                updatePartition.completeExceptionally(e);
            }
        })).exceptionally(ex -> {
            updatePartition.completeExceptionally((Throwable)ex);
            return null;
        });
        return updatePartition;
    }

    private CompletableFuture<Void> createSubscriptions(final DestinationName dn, final int numPartitions) {
        String path = PersistentTopics.path("partitioned-topics", dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), this.domain(), dn.getEncodedLocalName());
        final CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)PersistentTopics.fetchPartitionedTopicMetadataAsync(this.pulsar(), path).thenAccept(partitionMetadata -> {
            if (partitionMetadata.partitions <= 1) {
                result.completeExceptionally((Throwable)((Object)new RestException(Response.Status.CONFLICT, "Topic is not partitioned topic")));
                return;
            }
            if (partitionMetadata.partitions >= numPartitions) {
                result.completeExceptionally((Throwable)((Object)new RestException(Response.Status.CONFLICT, "number of partitions must be more than existing " + partitionMetadata.partitions)));
                return;
            }
            final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding();
            final Set topics = Sets.newConcurrentHashSet();
            ((ManagedLedgerFactoryImpl)this.pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<List<String>>(){

                public void operationComplete(List<String> cursors, MetaStore.Stat stat) {
                    ArrayList subscriptionCreationFuture = Lists.newArrayList();
                    cursors.forEach(cursor -> {
                        String subName = Codec.decode((String)cursor);
                        int i = partitionedTopicMetadata.partitions;
                        while (i < numPartitions) {
                            String topicName = dn.getPartition(i).toString();
                            CompletableFuture future = new CompletableFuture();
                            PersistentTopics.this.pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
                                topics.add(topic);
                                if (ex != null) {
                                    log.warn("[{}] Failed to create topic {}", (Object)PersistentTopics.this.clientAppId(), (Object)topicName);
                                    future.completeExceptionally((Throwable)ex);
                                    return null;
                                }
                                topic.createSubscription(subName).handle((sub, e) -> {
                                    if (e != null) {
                                        log.warn("[{}] Failed to create subsciption {} {}", new Object[]{PersistentTopics.this.clientAppId(), topicName, subName});
                                        future.completeExceptionally((Throwable)e);
                                        return null;
                                    }
                                    log.info("[{}] Successfully created subsciption {} {}", new Object[]{PersistentTopics.this.clientAppId(), topicName, subName});
                                    future.complete(null);
                                    return null;
                                });
                                return null;
                            });
                            subscriptionCreationFuture.add(future);
                            ++i;
                        }
                    });
                    FutureUtil.waitForAll((List)subscriptionCreationFuture).handle((res, subscriptionException) -> {
                        FutureUtil.waitForAll(topics.stream().map(topic -> topic.close()).collect(Collectors.toList())).handle((closed, topicCloseException) -> {
                            if (topicCloseException != null) {
                                log.warn("Failed to close newly created partitioned topics for {} ", (Object)dn, topicCloseException);
                            }
                            if (subscriptionException != null) {
                                result.completeExceptionally((Throwable)subscriptionException);
                            } else {
                                log.info("[{}] Successfully created new partitions {}", (Object)PersistentTopics.this.clientAppId(), (Object)dn.toString());
                                result.complete(null);
                            }
                            return null;
                        });
                        return null;
                    });
                }

                public void operationFailed(ManagedLedgerException.MetaStoreException ex) {
                    log.warn("[{}] Failed to get list of cursors of {}", (Object)PersistentTopics.this.clientAppId(), (Object)ledgerName);
                    result.completeExceptionally((Throwable)ex);
                }
            });
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partition metadata for {}", (Object)this.clientAppId(), (Object)dn.toString());
            result.completeExceptionally((Throwable)ex);
            return null;
        });
        return result;
    }

    protected void unloadTopic(DestinationName destination, boolean authoritative) {
        this.validateSuperUserAccess();
        this.validateDestinationOwnership(destination, authoritative);
        try {
            Topic topic = this.getTopicReference(destination);
            topic.close().get();
            log.info("[{}] Successfully unloaded topic {}", (Object)this.clientAppId(), (Object)destination);
        }
        catch (NullPointerException nullPointerException) {
            log.error("[{}] topic {} not found", (Object)this.clientAppId(), (Object)destination);
            throw new RestException(Response.Status.NOT_FOUND, "Topic does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload topic {}, {}", new Object[]{this.clientAppId(), destination, e.getCause().getMessage(), e});
            throw new RestException(e.getCause());
        }
    }

    private void validateClientVersion() {
        if (!this.pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
            return;
        }
        String userAgent = this.httpRequest.getHeader("User-Agent");
        if (StringUtils.isBlank((CharSequence)userAgent)) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
        }
        if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
            try {
                String[] splits;
                String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
                String[] stringArray = splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null;
                if (splits != null && splits.length > 1 && (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0]) || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1]))) {
                    throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version " + userAgent + " is not supported");
                }
            }
            catch (RestException re) {
                throw re;
            }
            catch (Exception exception) {
                log.warn("[{}] Failed to parse version {} ", (Object)this.clientAppId(), (Object)userAgent);
            }
        }
    }
}

