/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.impl;

import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTopicsBase
extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
    protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
    private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers((int)1, (int)21);

    protected List<String> internalGetList() {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        try {
            this.policiesCache().get(PersistentTopicsBase.path("policies", this.namespaceName.toString()));
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to get topic list {}: Namespace does not exist", (Object)this.clientAppId(), (Object)this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic list {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        ArrayList topics = Lists.newArrayList();
        try {
            String path = String.format("/managed-ledgers/%s/%s", this.namespaceName.toString(), this.domain());
            for (String topic : this.managedLedgerListCache().get(path)) {
                if (!this.domain().equals(TopicDomain.persistent.toString())) continue;
                topics.add(TopicName.get((String)this.domain(), (NamespaceName)this.namespaceName, (String)Codec.decode((String)topic)).toString());
            }
        }
        catch (KeeperException.NoNodeException path) {
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topics list for namespace {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        topics.sort(null);
        return topics;
    }

    protected List<String> internalGetPartitionedTopicList() {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        try {
            this.policiesCache().get(PersistentTopicsBase.path("policies", this.namespaceName.toString()));
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", (Object)this.clientAppId(), (Object)this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        return this.getPartitionedTopicList(TopicDomain.getEnum((String)this.domain()));
    }

    protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        String topicUri = this.topicName.toString();
        try {
            Policies policies = (Policies)this.policiesCache().get(PersistentTopicsBase.path("policies", this.namespaceName.toString())).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
            TreeMap permissions = Maps.newTreeMap();
            AuthPolicies auth = policies.auth_policies;
            for (String string : auth.namespace_auth.keySet()) {
                permissions.put(string, auth.namespace_auth.get(string));
            }
            if (auth.destination_auth.containsKey(topicUri)) {
                for (Map.Entry entry : ((Map)auth.destination_auth.get(topicUri)).entrySet()) {
                    String role = (String)entry.getKey();
                    Set topicPermissions = (Set)entry.getValue();
                    if (!permissions.containsKey(role)) {
                        permissions.put(role, topicPermissions);
                        continue;
                    }
                    Sets.SetView union = Sets.union((Set)((Set)permissions.get(role)), (Set)topicPermissions);
                    permissions.put(role, union);
                }
            }
            return permissions;
        }
        catch (Exception e) {
            log.error("[{}] Failed to get permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void validateAdminAndClientPermission() {
        try {
            this.validateAdminAccessForTenant(this.topicName.getTenant());
        }
        catch (Exception ve) {
            try {
                PersistentTopicsBase.checkAuthorization(this.pulsar(), this.topicName, this.clientAppId(), (AuthenticationDataSource)this.clientAuthData());
            }
            catch (RestException re) {
                throw re;
            }
            catch (Exception e) {
                log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", new Object[]{this.topicName, this.clientAppId(), e.getMessage(), e});
                throw new RestException(e);
            }
        }
    }

    public void validateAdminOperationOnTopic(boolean authoritative) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        this.validateTopicOwnership(this.topicName, authoritative);
    }

    protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
        this.validateTopicOwnership(this.topicName, authoritative);
        try {
            this.validateAdminAccessForTenant(this.topicName.getTenant());
        }
        catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] failed to validate admin access for {}", (Object)this.topicName, (Object)this.clientAppId());
            }
            this.validateAdminAccessForSubscriber(subscriptionName);
        }
    }

    private void validateAdminAccessForSubscriber(String subscriptionName) {
        try {
            if (!this.pulsar().getBrokerService().getAuthorizationService().canConsume(this.topicName, this.clientAppId(), (AuthenticationDataSource)this.clientAuthData(), subscriptionName)) {
                log.warn("[{}} Subscriber {} is not authorized to access api", (Object)this.topicName, (Object)this.clientAppId());
                throw new RestException(Response.Status.UNAUTHORIZED, String.format("Subscriber %s is not authorized to access this operation", this.clientAppId()));
            }
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception e) {
            log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", new Object[]{this.topicName, this.clientAppId(), e.getMessage(), e});
            throw new RestException(e);
        }
    }

    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        this.validatePoliciesReadOnlyAccess();
        String topicUri = this.topicName.toString();
        try {
            Stat nodeStat = new Stat();
            byte[] content = this.globalZk().getData(PersistentTopicsBase.path("policies", this.namespaceName.toString()), null, nodeStat);
            Policies policies = (Policies)PersistentTopicsBase.jsonMapper().readValue(content, Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(topicUri)) {
                policies.auth_policies.destination_auth.put(topicUri, new TreeMap());
            }
            ((Map)policies.auth_policies.destination_auth.get(topicUri)).put(role, actions);
            this.globalZk().setData(PersistentTopicsBase.path("policies", this.namespaceName.toString()), PersistentTopicsBase.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(PersistentTopicsBase.path("policies", this.namespaceName.toString()));
            log.info("[{}] Successfully granted access for role {}: {} - topic {}", new Object[]{this.clientAppId(), role, actions, topicUri});
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to grant permissions on topic {}: Namespace does not exist", (Object)this.clientAppId(), (Object)topicUri);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException e) {
            log.warn("[{}] Failed to grant permissions on topic {}: concurrent modification", (Object)this.clientAppId(), (Object)topicUri);
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to grant permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void internalDeleteTopicForcefully(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            topic.deleteForcefully().get();
        }
        catch (Exception e) {
            log.error("[{}] Failed to delete topic forcefully {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected void internalRevokePermissionsOnTopic(String role) {
        Policies policies;
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        this.validatePoliciesReadOnlyAccess();
        String topicUri = this.topicName.toString();
        Stat nodeStat = new Stat();
        try {
            byte[] content = this.globalZk().getData(PersistentTopicsBase.path("policies", this.namespaceName.toString()), null, nodeStat);
            policies = (Policies)PersistentTopicsBase.jsonMapper().readValue(content, Policies.class);
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to revoke permissions on topic {}: Namespace does not exist", (Object)this.clientAppId(), (Object)topicUri);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (KeeperException.BadVersionException e) {
            log.warn("[{}] Failed to revoke permissions on topic {}: concurrent modification", (Object)this.clientAppId(), (Object)topicUri);
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
        if (!policies.auth_policies.destination_auth.containsKey(topicUri) || !((Map)policies.auth_policies.destination_auth.get(topicUri)).containsKey(role)) {
            log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level", new Object[]{this.clientAppId(), role, topicUri});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
        }
        ((Map)policies.auth_policies.destination_auth.get(topicUri)).remove(role);
        try {
            String namespacePath = PersistentTopicsBase.path("policies", this.namespaceName.toString());
            this.globalZk().setData(namespacePath, PersistentTopicsBase.jsonMapper().writeValueAsBytes((Object)policies), nodeStat.getVersion());
            this.policiesCache().invalidate(namespacePath);
            this.globalZkCache().invalidate(namespacePath);
            log.info("[{}] Successfully revoke access for role {} - topic {}", new Object[]{this.clientAppId(), role, topicUri});
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void internalCreatePartitionedTopic(int numPartitions) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        if (numPartitions <= 0) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
        }
        try {
            boolean topicExist = this.pulsar().getNamespaceService().getListOfTopics(this.topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL).join().contains(this.topicName.toString());
            if (topicExist) {
                log.warn("[{}] Failed to create already existing topic {}", (Object)this.clientAppId(), (Object)this.topicName);
                throw new RestException(Response.Status.CONFLICT, "This topic already exists");
            }
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
        try {
            String path = ZkAdminPaths.partitionedTopicPath(this.topicName);
            byte[] data = PersistentTopicsBase.jsonMapper().writeValueAsBytes((Object)new PartitionedTopicMetadata(numPartitions));
            this.zkCreateOptimistic(path, data);
            Thread.sleep(1000L);
            log.info("[{}] Successfully created partitioned topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        }
        catch (KeeperException.NodeExistsException e) {
            log.warn("[{}] Failed to create already existing partitioned topic {}", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.CONFLICT, "Partitioned topic already exists");
        }
        catch (KeeperException.BadVersionException e) {
            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected void internalCreateNonPartitionedTopic(boolean authoritative) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        try {
            Topic createdTopic = this.getOrCreateTopic(this.topicName);
            log.info("[{}] Successfully created non-partitioned topic {}", (Object)this.clientAppId(), (Object)createdTopic);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create non-partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected void internalUpdatePartitionedTopic(int numPartitions) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal() && this.isNamespaceReplicated(this.topicName.getNamespaceObject())) {
            log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.FORBIDDEN, "Update forbidden on global namespace");
        }
        if (numPartitions <= 0) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
        }
        try {
            this.updatePartitionedTopic(this.topicName, numPartitions).get();
        }
        catch (Exception e) {
            if (e.getCause() instanceof RestException) {
                throw (RestException)((Object)e.getCause());
            }
            log.error("[{}] Failed to update partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
        PartitionedTopicMetadata metadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, checkAllowAutoCreation);
        if (metadata.partitions > 1) {
            this.validateClientVersion();
        }
        return metadata;
    }

    protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        CompletableFuture future = new CompletableFuture();
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        int numPartitions = partitionMetadata.partitions;
        if (numPartitions > 0) {
            AtomicInteger count = new AtomicInteger(numPartitions);
            for (int i = 0; i < numPartitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    this.pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force).whenComplete((r, ex) -> {
                        if (ex != null) {
                            if (!(ex instanceof PulsarAdminException.NotFoundException)) {
                                log.error("[{}] Failed to delete partition {}", new Object[]{this.clientAppId(), topicNamePartition, ex});
                                future.completeExceptionally((Throwable)ex);
                                return;
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Partition not found: {}", (Object)this.clientAppId(), (Object)topicNamePartition);
                            }
                        } else {
                            log.info("[{}] Deleted partition {}", (Object)this.clientAppId(), (Object)topicNamePartition);
                        }
                        if (count.decrementAndGet() == 0) {
                            future.complete(null);
                        }
                    });
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to delete partition {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                    future.completeExceptionally(e);
                }
            }
        } else {
            future.complete(null);
        }
        future.whenComplete((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException.PreconditionFailedException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions")));
                    return;
                }
                if (ex instanceof PulsarAdminException) {
                    asyncResponse.resume((Throwable)((Object)new RestException((PulsarAdminException)((Object)ex))));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)ex)));
                return;
            }
            String path = PersistentTopicsBase.path("partitioned-topics", this.namespaceName.toString(), this.domain(), this.topicName.getEncodedLocalName());
            try {
                this.globalZk().delete(path, -1);
                this.globalZkCache().invalidate(path);
                Thread.sleep(1000L);
                log.info("[{}] Deleted partitioned topic {}", (Object)this.clientAppId(), (Object)this.topicName);
                asyncResponse.resume((Object)Response.noContent().build());
                return;
            }
            catch (KeeperException.NoNodeException nne) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Partitioned topic does not exist")));
                return;
            }
            catch (KeeperException.BadVersionException e) {
                log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", (Object)this.clientAppId(), (Object)this.topicName);
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.CONFLICT, "Concurrent modification")));
                return;
            }
            catch (Exception e) {
                log.error("[{}] Failed to delete partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        });
    }

    protected void internalUnloadTopic(boolean authoritative) {
        log.info("[{}] Unloading topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.unloadTopic(this.topicName, authoritative);
    }

    protected void internalDeleteTopic(boolean authoritative, boolean force) {
        if (force) {
            this.internalDeleteTopicForcefully(authoritative);
        } else {
            this.internalDeleteTopic(authoritative);
        }
    }

    protected void internalDeleteTopic(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        if (topic.isReplicated()) {
            List clusters = topic.getReplicators().keys();
            log.error("[{}] Delete forbidden topic {} is replicated on clusters {}", new Object[]{this.clientAppId(), this.topicName, clusters});
            throw new RestException(Response.Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + clusters);
        }
        try {
            topic.delete().get();
            log.info("[{}] Successfully removed topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            log.error("[{}] Failed to get delete topic {}", new Object[]{this.clientAppId(), this.topicName, t});
            if (t instanceof BrokerServiceException.TopicBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
            }
            throw new RestException(t);
        }
    }

    protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        ArrayList subscriptions = Lists.newArrayList();
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            try {
                this.pulsar().getAdminClient().topics().getSubscriptionsAsync(this.topicName.getPartition(0).toString()).whenComplete((r, ex) -> {
                    if (ex != null) {
                        log.warn("[{}] Failed to get list of subscriptions for {}: {}", new Object[]{this.clientAppId(), this.topicName, ex.getMessage()});
                        if (ex instanceof PulsarAdminException) {
                            PulsarAdminException pae = (PulsarAdminException)((Object)ex);
                            if (pae.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet")));
                                return;
                            }
                            asyncResponse.resume((Throwable)((Object)new RestException(pae)));
                            return;
                        }
                        asyncResponse.resume((Throwable)((Object)new RestException((Throwable)ex)));
                        return;
                    }
                    subscriptions.addAll(r);
                    asyncResponse.resume((Object)subscriptions);
                });
            }
            catch (Exception e) {
                log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{this.clientAppId(), this.topicName, e});
                asyncResponse.resume((Throwable)e);
                return;
            }
        } else {
            this.validateAdminOperationOnTopic(authoritative);
            Topic topic = this.getTopicReference(this.topicName);
            try {
                topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
                asyncResponse.resume((Object)subscriptions);
                return;
            }
            catch (Exception e) {
                log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{this.clientAppId(), this.topicName, e});
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
    }

    protected TopicStats internalGetStats(boolean authoritative) {
        this.validateAdminAndClientPermission();
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        return topic.getStats();
    }

    protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative) {
        this.validateAdminAndClientPermission();
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        return topic.getInternalStats();
    }

    protected void internalGetManagedLedgerInfo(final AsyncResponse asyncResponse) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        String managedLedger = this.topicName.getPersistenceNamingEncoding();
        this.pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new AsyncCallbacks.ManagedLedgerInfoCallback(){

            public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
                asyncResponse.resume(output -> AdminResource.jsonMapper().writer().writeValue(output, (Object)info));
            }

            public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
                asyncResponse.resume((Throwable)exception);
            }
        }, null);
    }

    protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition) {
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
        ArrayList topicStatsFutureList = Lists.newArrayList();
        for (int i = 0; i < partitionMetadata.partitions; ++i) {
            try {
                topicStatsFutureList.add(this.pulsar().getAdminClient().topics().getStatsAsync(this.topicName.getPartition(i).toString()));
                continue;
            }
            catch (PulsarServerException e) {
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
        FutureUtil.waitForAll((List)topicStatsFutureList).handle((result, exception) -> {
            CompletableFuture statFuture = null;
            for (int i = 0; i < topicStatsFutureList.size(); ++i) {
                statFuture = (CompletableFuture)topicStatsFutureList.get(i);
                if (!statFuture.isDone() || statFuture.isCompletedExceptionally()) continue;
                try {
                    stats.add((TopicStats)statFuture.get());
                    if (!perPartition) continue;
                    stats.partitions.put(this.topicName.getPartition(i).toString(), statFuture.get());
                    continue;
                }
                catch (Exception e) {
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return null;
                }
            }
            if (perPartition && stats.partitions.isEmpty()) {
                String path = ZkAdminPaths.partitionedTopicPath(this.topicName);
                try {
                    boolean zkPathExists = this.zkPathExists(path);
                    if (!zkPathExists) {
                        asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet")));
                        return null;
                    }
                    stats.partitions.put(this.topicName.toString(), new TopicStats());
                }
                catch (InterruptedException | KeeperException e) {
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return null;
                }
            }
            asyncResponse.resume((Object)stats);
            return null;
        });
    }

    protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
        ArrayList topicStatsFutureList = Lists.newArrayList();
        for (int i = 0; i < partitionMetadata.partitions; ++i) {
            try {
                topicStatsFutureList.add(this.pulsar().getAdminClient().topics().getInternalStatsAsync(this.topicName.getPartition(i).toString()));
                continue;
            }
            catch (PulsarServerException e) {
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
        FutureUtil.waitForAll((List)topicStatsFutureList).handle((result, exception) -> {
            CompletableFuture statFuture = null;
            for (int i = 0; i < topicStatsFutureList.size(); ++i) {
                statFuture = (CompletableFuture)topicStatsFutureList.get(i);
                if (!statFuture.isDone() || statFuture.isCompletedExceptionally()) continue;
                try {
                    stats.partitions.put(this.topicName.getPartition(i).toString(), statFuture.get());
                    continue;
                }
                catch (Exception e) {
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return null;
                }
            }
            asyncResponse.resume(!stats.partitions.isEmpty() ? stats : new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
            return null;
        });
    }

    protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        ArrayList futures;
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            futures = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    futures.add(this.pulsar().getAdminClient().topics().deleteSubscriptionAsync(topicNamePartition.toString(), subName));
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), topicNamePartition, subName, e});
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return;
                }
            }
        } else {
            this.validateAdminAccessForSubscriber(subName, authoritative);
            Topic topic = this.getTopicReference(this.topicName);
            try {
                Subscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.delete().get();
                log.info("[{}][{}] Deleted subscription {}", new Object[]{this.clientAppId(), this.topicName, subName});
                asyncResponse.resume((Object)Response.noContent().build());
                return;
            }
            catch (Exception e) {
                Throwable t = e.getCause();
                if (e instanceof NullPointerException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                    return;
                }
                if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers")));
                    return;
                }
                log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), this.topicName, subName, e});
                asyncResponse.resume((Throwable)((Object)new RestException(t)));
                return;
            }
        }
        FutureUtil.waitForAll((List)futures).handle((result, exception) -> {
            if (exception != null) {
                Throwable t = exception.getCause();
                if (t instanceof PulsarAdminException.NotFoundException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                    return null;
                }
                if (t instanceof PulsarAdminException.PreconditionFailedException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers")));
                    return null;
                }
                log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                asyncResponse.resume((Throwable)((Object)new RestException(t)));
                return null;
            }
            asyncResponse.resume((Object)Response.noContent().build());
            return null;
        });
    }

    protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            ArrayList futures = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    futures.add(this.pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(), subName));
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), topicNamePartition, subName, e});
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return;
                }
            }
            FutureUtil.waitForAll((List)futures).handle((result, exception) -> {
                if (exception != null) {
                    Throwable t = exception.getCause();
                    if (t instanceof PulsarAdminException.NotFoundException) {
                        asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                        return null;
                    }
                    log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                    asyncResponse.resume((Throwable)((Object)new RestException(t)));
                    return null;
                }
                asyncResponse.resume((Object)Response.noContent().build());
                return null;
            });
        } else {
            this.validateAdminAccessForSubscriber(subName, authoritative);
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
                if (ex != null) {
                    asyncResponse.resume((Throwable)((Object)new RestException((Throwable)ex)));
                    log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), this.topicName, subName, ex});
                } else {
                    asyncResponse.resume((Object)Response.noContent().build());
                    log.info("[{}] Cleared backlog on {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
                }
            };
            try {
                if (subName.startsWith(topic.getReplicatorPrefix())) {
                    String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                    PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                    Preconditions.checkNotNull((Object)repl);
                    repl.clearBacklog().whenComplete((BiConsumer)biConsumer);
                } else {
                    PersistentSubscription sub = topic.getSubscription(subName);
                    Preconditions.checkNotNull((Object)sub);
                    sub.clearBacklog().whenComplete((BiConsumer)biConsumer);
                }
            }
            catch (Exception e) {
                if (e instanceof NullPointerException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                }
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
            }
        }
    }

    protected void internalSkipMessages(String subName, int numMessages, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
        }
        this.validateAdminAccessForSubscriber(subName, authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        try {
            if (subName.startsWith(topic.getReplicatorPrefix())) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                Preconditions.checkNotNull((Object)repl);
                repl.skipMessages(numMessages).get();
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.skipMessages(numMessages).get();
            }
            log.info("[{}] Skipped {} messages on {} {}", new Object[]{this.clientAppId(), numMessages, this.topicName, subName});
        }
        catch (NullPointerException npe) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to skip {} messages {} {}", new Object[]{this.clientAppId(), numMessages, this.topicName, subName, exception});
            throw new RestException(exception);
        }
    }

    protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds, boolean authoritative) {
        ArrayList futures;
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            futures = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    futures.add(this.pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(topicNamePartition.toString(), (long)expireTimeInSeconds));
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, topicNamePartition, e});
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return;
                }
            }
        } else {
            this.validateAdminOperationOnTopic(authoritative);
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
            AtomicReference exception2 = new AtomicReference();
            topic.getReplicators().forEach((subName, replicator) -> {
                try {
                    this.internalExpireMessagesForSinglePartition((String)subName, expireTimeInSeconds, authoritative);
                }
                catch (Throwable t) {
                    exception2.set(t);
                }
            });
            topic.getSubscriptions().forEach((subName, subscriber) -> {
                try {
                    this.internalExpireMessagesForSinglePartition((String)subName, expireTimeInSeconds, authoritative);
                }
                catch (Throwable t) {
                    exception2.set(t);
                }
            });
            if (exception2.get() != null) {
                if (exception2.get() instanceof WebApplicationException) {
                    WebApplicationException wae = (WebApplicationException)((Object)exception2.get());
                    asyncResponse.resume((Throwable)wae);
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)exception2.get())));
                return;
            }
            asyncResponse.resume((Object)Response.noContent().build());
            return;
        }
        FutureUtil.waitForAll((List)futures).handle((result, exception) -> {
            if (exception != null) {
                Throwable t = exception.getCause();
                log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, t});
                asyncResponse.resume((Throwable)((Object)new RestException(t)));
                return null;
            }
            asyncResponse.resume((Object)Response.noContent().build());
            return null;
        });
    }

    protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp, boolean authoritative) {
        AtomicReference partitionException;
        AtomicInteger failureCount;
        CompletableFuture future;
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        int numPartitions = partitionMetadata.partitions;
        if (numPartitions > 0) {
            future = new CompletableFuture();
            AtomicInteger count = new AtomicInteger(numPartitions);
            failureCount = new AtomicInteger(0);
            partitionException = new AtomicReference();
            for (int i = 0; i < numPartitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    this.pulsar().getAdminClient().topics().resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
                        if (ex != null) {
                            if (ex instanceof PulsarAdminException.PreconditionFailedException) {
                                failureCount.incrementAndGet();
                                partitionException.set(ex);
                            } else {
                                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), topicNamePartition, subName, timestamp, ex});
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            }
                        }
                        if (count.decrementAndGet() == 0) {
                            future.complete(null);
                        }
                        return null;
                    });
                    continue;
                }
                catch (Exception e) {
                    log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), topicNamePartition, subName, timestamp, e});
                    future.completeExceptionally(e);
                }
            }
        } else {
            this.validateAdminAccessForSubscriber(subName, authoritative);
            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp});
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
            if (topic == null) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Topic not found")));
                return;
            }
            try {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.resetCursor(timestamp).get();
                log.info("[{}] [{}] Reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp});
                asyncResponse.resume((Object)Response.noContent().build());
                return;
            }
            catch (Exception e) {
                Throwable t = e.getCause();
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, e});
                if (e instanceof NullPointerException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                    return;
                }
                if (e instanceof BrokerServiceException.NotAllowedException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.METHOD_NOT_ALLOWED, e.getMessage())));
                    return;
                }
                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified -" + t.getMessage())));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
        future.whenComplete((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException) {
                    asyncResponse.resume((Throwable)((Object)new RestException((PulsarAdminException)((Object)ex))));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)ex)));
                return;
            }
            if (failureCount.get() == numPartitions) {
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, partitionException.get()});
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, ((Throwable)partitionException.get()).getMessage())));
                return;
            }
            if (failureCount.get() > 0) {
                log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, partitionException.get()});
            }
            asyncResponse.resume((Object)Response.noContent().build());
        });
    }

    protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
        AtomicReference partitionException;
        CompletableFuture future;
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl)MessageId.earliest : messageId;
        log.info("[{}][{}] Creating subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId});
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        int numPartitions = partitionMetadata.partitions;
        if (numPartitions > 0) {
            future = new CompletableFuture();
            AtomicInteger count = new AtomicInteger(numPartitions);
            AtomicInteger failureCount = new AtomicInteger(0);
            partitionException = new AtomicReference();
            for (int i = 0; i < numPartitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    this.pulsar().getAdminClient().topics().createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, (MessageId)targetMessageId).handle((r, ex) -> {
                        if (!(ex == null || failureCount.incrementAndGet() != numPartitions && ex instanceof PulsarAdminException.ConflictException)) {
                            partitionException.set(ex);
                        }
                        if (count.decrementAndGet() == 0) {
                            future.complete(null);
                        }
                        return null;
                    });
                    continue;
                }
                catch (Exception e) {
                    log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e});
                    future.completeExceptionally(e);
                }
            }
        } else {
            this.validateAdminAccessForSubscriber(subscriptionName, authoritative);
            PersistentTopic topic = (PersistentTopic)this.getOrCreateTopic(this.topicName);
            if (topic.getSubscriptions().containsKey((Object)subscriptionName)) {
                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.CONFLICT, "Subscription already exists for topic")));
                return;
            }
            try {
                PersistentSubscription subscription = (PersistentSubscription)topic.createSubscription(subscriptionName, PulsarApi.CommandSubscribe.InitialPosition.Latest, replicated).get();
                subscription.deactivateCursor();
                subscription.resetCursor((Position)PositionImpl.get((long)targetMessageId.getLedgerId(), (long)targetMessageId.getEntryId())).get();
            }
            catch (Throwable e) {
                Throwable t = e.getCause();
                log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId, e});
                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage())));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
            log.info("[{}][{}] Successfully created subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId});
            asyncResponse.resume((Object)Response.noContent().build());
            return;
        }
        future.whenComplete((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException) {
                    asyncResponse.resume((Throwable)((Object)new RestException((PulsarAdminException)((Object)ex))));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)ex)));
                return;
            }
            if (partitionException.get() != null) {
                log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId, partitionException.get()});
                if (partitionException.get() instanceof PulsarAdminException) {
                    asyncResponse.resume((Throwable)((Object)new RestException((PulsarAdminException)((Object)((Object)partitionException.get())))));
                    return;
                }
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)partitionException.get())));
                return;
            }
            asyncResponse.resume((Object)Response.noContent().build());
        });
    }

    protected void internalResetCursorOnPosition(String subName, boolean authoritative, MessageIdImpl messageId) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        log.info("[{}][{}] received reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId});
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            log.warn("[{}] Not supported operation on partitioned-topic {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic");
        }
        this.validateAdminAccessForSubscriber(subName, authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        if (topic == null) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        try {
            PersistentSubscription sub = topic.getSubscription(subName);
            Preconditions.checkNotNull((Object)sub);
            sub.resetCursor((Position)PositionImpl.get((long)messageId.getLedgerId(), (long)messageId.getEntryId())).get();
            log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId});
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, e});
            if (e instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage());
            }
            throw new RestException(e);
        }
    }

    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
        }
        this.validateAdminAccessForSubscriber(subName, authoritative);
        if (!(this.getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        PersistentReplicator repl = null;
        PersistentSubscription sub = null;
        Entry entry = null;
        if (subName.startsWith(topic.getReplicatorPrefix())) {
            repl = this.getReplicatorReference(subName, topic);
        } else {
            sub = (PersistentSubscription)this.getSubscriptionReference(subName, topic);
        }
        try {
            entry = subName.startsWith(topic.getReplicatorPrefix()) ? repl.peekNthMessage(messagePosition).get() : sub.peekNthMessage(messagePosition).get();
            Preconditions.checkNotNull((Object)entry);
            PositionImpl pos = (PositionImpl)entry.getPosition();
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)metadataAndPayload);
            Response.ResponseBuilder responseBuilder = Response.ok();
            responseBuilder.header("X-Pulsar-Message-ID", (Object)pos.toString());
            for (PulsarApi.KeyValue keyValue : metadata.getPropertiesList()) {
                responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), (Object)keyValue.getValue());
            }
            if (metadata.hasPublishTime()) {
                responseBuilder.header("X-Pulsar-publish-time", (Object)DateFormatter.format((long)metadata.getPublishTime()));
            }
            if (metadata.hasEventTime()) {
                responseBuilder.header("X-Pulsar-event-time", (Object)DateFormatter.format((long)metadata.getEventTime()));
            }
            if (metadata.hasNumMessagesInBatch()) {
                responseBuilder.header("X-Pulsar-num-batch-message", (Object)metadata.getNumMessagesInBatch());
            }
            CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((PulsarApi.CompressionType)metadata.getCompression());
            ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
            final ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), uncompressedPayload.readableBytes());
            data.writeBytes(uncompressedPayload);
            uncompressedPayload.release();
            StreamingOutput stream = new StreamingOutput(){

                public void write(OutputStream output) throws IOException, WebApplicationException {
                    output.write(data.array(), data.arrayOffset(), data.readableBytes());
                    data.release();
                }
            };
            Response response = responseBuilder.entity((Object)stream).build();
            return response;
        }
        catch (NullPointerException npe) {
            throw new RestException(Response.Status.NOT_FOUND, "Message not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to get message at position {} from {} {}", new Object[]{this.clientAppId(), messagePosition, this.topicName, subName, exception});
            throw new RestException(exception);
        }
        finally {
            if (entry != null) {
                entry.release();
            }
        }
    }

    protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        try {
            this.policiesCache().get(PersistentTopicsBase.path("policies", this.namespaceName.toString()));
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", (Object)this.clientAppId(), (Object)this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic backlog {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        PersistentOfflineTopicStats offlineTopicStats = null;
        try {
            long elapsedMs;
            offlineTopicStats = this.pulsar().getBrokerService().getOfflineTopicStat(this.topicName);
            if (offlineTopicStats != null && TimeUnit.MINUTES.convert(elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime(), TimeUnit.MILLISECONDS) < 10L) {
                return offlineTopicStats;
            }
            ManagedLedgerConfig config = this.pulsar().getBrokerService().getManagedLedgerConfig(this.topicName).get();
            ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(), this.pulsar().getAdvertisedAddress(), false);
            offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl)this.pulsar().getManagedLedgerFactory(), this.topicName);
            this.pulsar().getBrokerService().cacheOfflineTopicStats(this.topicName, offlineTopicStats);
        }
        catch (Exception exception) {
            throw new RestException(exception);
        }
        return offlineTopicStats;
    }

    protected MessageId internalTerminate(boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
        }
        this.validateAdminOperationOnTopic(authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            return ((PersistentTopic)topic).terminate().get();
        }
        catch (Exception exception) {
            log.error("[{}] Failed to terminated topic {}", new Object[]{this.clientAppId(), this.topicName, exception});
            throw new RestException(exception);
        }
    }

    protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds, boolean authoritative) {
        ArrayList futures;
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            futures = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                try {
                    futures.add(this.pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(), subName, (long)expireTimeInSeconds));
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, topicNamePartition, e});
                    asyncResponse.resume((Throwable)((Object)new RestException(e)));
                    return;
                }
            }
        } else {
            try {
                this.internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
            }
            catch (WebApplicationException wae) {
                asyncResponse.resume((Throwable)wae);
                return;
            }
            catch (Exception e) {
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
            asyncResponse.resume((Object)Response.noContent().build());
            return;
        }
        FutureUtil.waitForAll((List)futures).handle((result, exception) -> {
            if (exception != null) {
                Throwable t = exception.getCause();
                if (t instanceof PulsarAdminException.NotFoundException) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Subscription not found")));
                    return null;
                }
                log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, t});
                asyncResponse.resume((Throwable)((Object)new RestException(t)));
                return null;
            }
            asyncResponse.resume((Object)Response.noContent().build());
            return null;
        });
    }

    private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            String msg = "This method should not be called for partitioned topic";
            log.error("[{}] {} {} {}", new Object[]{this.clientAppId(), msg, this.topicName, subName});
            throw new IllegalStateException(msg);
        }
        this.validateAdminAccessForSubscriber(subName, authoritative);
        if (!(this.getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        try {
            if (subName.startsWith(topic.getReplicatorPrefix())) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                Preconditions.checkNotNull((Object)repl);
                repl.expireMessages(expireTimeInSeconds);
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                Preconditions.checkNotNull((Object)sub);
                sub.expireMessages(expireTimeInSeconds);
            }
            log.info("[{}] Message expire started up to {} on {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, subName});
        }
        catch (NullPointerException npe) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, subName, exception});
            throw new RestException(exception);
        }
    }

    protected void internalTriggerCompaction(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        try {
            topic.triggerCompaction();
        }
        catch (BrokerServiceException.AlreadyRunningException e) {
            throw new RestException(Response.Status.CONFLICT, e.getMessage());
        }
        catch (Exception e) {
            throw new RestException(e);
        }
    }

    protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        return topic.compactionStatus();
    }

    protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
        this.validateAdminOperationOnTopic(authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        try {
            topic.triggerOffload(messageId);
        }
        catch (BrokerServiceException.AlreadyRunningException e) {
            throw new RestException(Response.Status.CONFLICT, e.getMessage());
        }
        catch (Exception e) {
            log.warn("Unexpected error triggering offload", (Throwable)e);
            throw new RestException(e);
        }
    }

    protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        return topic.offloadStatus();
    }

    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            try {
                PersistentTopicsBase.checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
            }
            catch (RestException e) {
                try {
                    PersistentTopicsBase.validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant());
                }
                catch (RestException authException) {
                    log.warn("Failed to authorize {} on cluster {}", (Object)clientAppId, (Object)topicName.toString());
                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", clientAppId, topicName.toString(), authException.getMessage()));
                }
            }
            catch (Exception ex2) {
                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", new Object[]{clientAppId, topicName.toString(), ex2.getMessage(), ex2});
                throw ex2;
            }
            String path = PersistentTopicsBase.path("partitioned-topics", topicName.getNamespace(), topicName.getDomain().toString(), topicName.getEncodedLocalName());
            ((CompletableFuture)((CompletableFuture)PersistentTopicsBase.checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()).thenCompose(res -> PersistentTopicsBase.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName))).thenAccept(metadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{clientAppId, topicName, metadata.partitions});
                }
                metadataFuture.complete((PartitionedTopicMetadata)metadata);
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally(ex.getCause());
                return null;
            });
        }
        catch (Exception ex3) {
            metadataFuture.completeExceptionally(ex3);
        }
        return metadataFuture;
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(this.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> this.topicNotFoundReason(topicName));
        }
        catch (RestException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RestException(e);
        }
    }

    private RestException topicNotFoundReason(TopicName topicName) {
        if (!topicName.isPartitioned()) {
            return new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        PartitionedTopicMetadata partitionedTopicMetadata = this.getPartitionedTopicMetadata(TopicName.get((String)topicName.getPartitionedTopicName()), false, false);
        if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
            String topicErrorType = partitionedTopicMetadata == null ? "has no metadata" : "has zero partitions";
            return new RestException(Response.Status.NOT_FOUND, String.format("Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
        }
        if (!this.internalGetList().contains(topicName.toString())) {
            return new RestException(Response.Status.NOT_FOUND, "Topic partitions were not yet created");
        }
        return new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
    }

    private Topic getOrCreateTopic(TopicName topicName) {
        return (Topic)((CompletableFuture)this.pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get)).join();
    }

    private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
        try {
            PersistentSubscription sub = topic.getSubscription(subName);
            return (Subscription)Preconditions.checkNotNull((Object)sub);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
    }

    private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
        try {
            String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
            PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
            return (PersistentReplicator)Preconditions.checkNotNull((Object)repl);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Replicator not found");
        }
    }

    private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions) {
        String path = ZkAdminPaths.partitionedTopicPath(topicName);
        CompletableFuture<Void> updatePartition = new CompletableFuture<Void>();
        ((CompletableFuture)this.createSubscriptions(topicName, numPartitions).thenAccept(res -> {
            try {
                byte[] data = PersistentTopicsBase.jsonMapper().writeValueAsBytes((Object)new PartitionedTopicMetadata(numPartitions));
                this.globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        updatePartition.complete(null);
                    } else {
                        updatePartition.completeExceptionally(KeeperException.create((KeeperException.Code)KeeperException.Code.get((int)rc), (String)"failed to create update partitions"));
                    }
                }, null);
            }
            catch (Exception e) {
                updatePartition.completeExceptionally(e);
            }
        })).exceptionally(ex -> {
            updatePartition.completeExceptionally((Throwable)ex);
            return null;
        });
        return updatePartition;
    }

    private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
        String path = PersistentTopicsBase.path("partitioned-topics", topicName.getPersistenceNamingEncoding());
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)PersistentTopicsBase.fetchPartitionedTopicMetadataAsync(this.pulsar(), path).thenAccept(partitionMetadata -> {
            PulsarAdmin admin;
            if (partitionMetadata.partitions <= 1) {
                result.completeExceptionally((Throwable)((Object)new RestException(Response.Status.CONFLICT, "Topic is not partitioned topic")));
                return;
            }
            if (partitionMetadata.partitions >= numPartitions) {
                result.completeExceptionally((Throwable)((Object)new RestException(Response.Status.CONFLICT, "number of partitions must be more than existing " + partitionMetadata.partitions)));
                return;
            }
            try {
                admin = this.pulsar().getAdminClient();
            }
            catch (PulsarServerException e1) {
                result.completeExceptionally(e1);
                return;
            }
            ((CompletableFuture)admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> stats.subscriptions.keySet().forEach(subscription -> {
                ArrayList<CompletableFuture> subscriptionFutures = new ArrayList<CompletableFuture>();
                for (int i = partitionMetadata.partitions; i < numPartitions; ++i) {
                    String topicNamePartition = topicName.getPartition(i).toString();
                    subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, subscription, MessageId.latest));
                }
                ((CompletableFuture)FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
                    log.info("[{}] Successfully created new partitions {}", (Object)this.clientAppId(), (Object)topicName);
                    result.complete(null);
                })).exceptionally(ex -> {
                    log.warn("[{}] Failed to create subscriptions on new partitions for {}", new Object[]{this.clientAppId(), topicName, ex});
                    result.completeExceptionally((Throwable)ex);
                    return null;
                });
            }))).exceptionally(ex -> {
                if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
                    result.complete(null);
                } else {
                    log.warn("[{}] Failed to get list of subscriptions of {}", new Object[]{this.clientAppId(), topicName.getPartition(0), ex});
                    result.completeExceptionally((Throwable)ex);
                }
                return null;
            });
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partition metadata for {}", (Object)this.clientAppId(), (Object)topicName.toString());
            result.completeExceptionally((Throwable)ex);
            return null;
        });
        return result;
    }

    protected void unloadTopic(TopicName topicName, boolean authoritative) {
        this.validateSuperUserAccess();
        this.validateTopicOwnership(topicName, authoritative);
        try {
            Topic topic = this.getTopicReference(topicName);
            topic.close().get();
            log.info("[{}] Successfully unloaded topic {}", (Object)this.clientAppId(), (Object)topicName);
        }
        catch (NullPointerException e) {
            log.error("[{}] topic {} not found", (Object)this.clientAppId(), (Object)topicName);
            throw new RestException(Response.Status.NOT_FOUND, "Topic does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload topic {}, {}", new Object[]{this.clientAppId(), topicName, e.getMessage(), e});
            throw new RestException(e);
        }
    }

    private void validateClientVersion() {
        if (!this.pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
            return;
        }
        String userAgent = this.httpRequest.getHeader("User-Agent");
        if (StringUtils.isBlank((CharSequence)userAgent)) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
        }
        if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
            try {
                String[] splits;
                String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
                String[] stringArray = splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null;
                if (splits != null && splits.length > 1 && (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0]) || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1]))) {
                    throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version " + userAgent + " is not supported");
                }
            }
            catch (RestException re) {
                throw re;
            }
            catch (Exception e) {
                log.warn("[{}] Failed to parse version {} ", (Object)this.clientAppId(), (Object)userAgent);
            }
        }
    }

    protected MessageId internalGetLastMessageId(boolean authoritative) {
        this.validateAdminOperationOnTopic(authoritative);
        if (!(this.getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {}", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "GetLastMessageId on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        Position position = topic.getLastMessageId();
        int partitionIndex = TopicName.getPartitionIndex((String)topic.getName());
        MessageIdImpl messageId = new MessageIdImpl(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId(), partitionIndex);
        return messageId;
    }
}

