/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.admin.internal;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.BaseResource;
import org.apache.pulsar.client.admin.internal.WebTargets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.javax.ws.rs.client.Entity;
import org.apache.pulsar.shade.javax.ws.rs.client.InvocationCallback;
import org.apache.pulsar.shade.javax.ws.rs.client.WebTarget;
import org.apache.pulsar.shade.javax.ws.rs.core.GenericType;
import org.apache.pulsar.shade.javax.ws.rs.core.MultivaluedMap;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicsImpl
extends BaseResource
implements Topics {
    private final WebTarget adminTopics;
    private final WebTarget adminV2Topics;
    private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
    private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);

    public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
        super(auth, readTimeoutMs);
        this.adminTopics = web.path("/admin");
        this.adminV2Topics = web.path("/admin/v2");
    }

    @Override
    public List<String> getList(String namespace) throws PulsarAdminException {
        try {
            NamespaceName ns = NamespaceName.get(namespace);
            WebTarget persistentPath = this.namespacePath("persistent", ns, new String[0]);
            WebTarget nonPersistentPath = this.namespacePath("non-persistent", ns, new String[0]);
            List<String> persistentTopics = this.request(persistentPath).get(new GenericType<List<String>>(){});
            List<String> nonPersistentTopics = this.request(nonPersistentPath).get(new GenericType<List<String>>(){});
            return new ArrayList<String>(Stream.concat(persistentTopics.stream(), nonPersistentTopics.stream()).collect(Collectors.toSet()));
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException {
        try {
            NamespaceName ns = NamespaceName.get(namespace);
            WebTarget persistentPath = this.namespacePath("persistent", ns, "partitioned");
            WebTarget nonPersistentPath = this.namespacePath("non-persistent", ns, "partitioned");
            List<String> persistentTopics = this.request(persistentPath).get(new GenericType<List<String>>(){});
            List<String> nonPersistentTopics = this.request(nonPersistentPath).get(new GenericType<List<String>>(){});
            return new ArrayList<String>(Stream.concat(persistentTopics.stream(), nonPersistentTopics.stream()).collect(Collectors.toSet()));
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
        try {
            return this.getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
        NamespaceName ns = NamespaceName.get(namespace);
        final CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        WebTarget path = this.namespacePath("non-persistent", ns, bundleRange);
        this.asyncGetRequest(path, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public Map<String, Set<AuthAction>> getPermissions(String topic) throws PulsarAdminException {
        try {
            TopicName tn = TopicName.get(topic);
            WebTarget path = this.topicPath(tn, "permissions");
            return this.request(path).get(new GenericType<Map<String, Set<AuthAction>>>(){});
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void grantPermission(String topic, String role, Set<AuthAction> actions) throws PulsarAdminException {
        try {
            TopicName tn = TopicName.get(topic);
            WebTarget path = this.topicPath(tn, "permissions", role);
            this.request(path).post(Entity.entity(actions, "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void revokePermissions(String topic, String role) throws PulsarAdminException {
        try {
            TopicName tn = TopicName.get(topic);
            WebTarget path = this.topicPath(tn, "permissions", role);
            this.request(path).delete(ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
        try {
            this.createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
        try {
            this.createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, new String[0]);
        return this.asyncPutRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
        Preconditions.checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        return this.asyncPutRequest(path, Entity.entity(Integer.valueOf(numPartitions), "application/json"));
    }

    @Override
    public void updatePartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
        try {
            this.updatePartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
        Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        return this.asyncPostRequest(path, Entity.entity(Integer.valueOf(numPartitions), "application/json"));
    }

    @Override
    public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
        try {
            return this.getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<PartitionedTopicMetadata>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicMetadata>(){

            @Override
            public void completed(PartitionedTopicMetadata response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void deletePartitionedTopic(String topic) throws PulsarAdminException {
        this.deletePartitionedTopic(topic, false);
    }

    @Override
    public void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException {
        try {
            this.deletePartitionedTopicAsync(topic, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitions");
        path = path.queryParam("force", force);
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void delete(String topic) throws PulsarAdminException {
        this.delete(topic, false);
    }

    @Override
    public void delete(String topic, boolean force) throws PulsarAdminException {
        try {
            this.deleteAsync(topic, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deleteAsync(String topic, boolean force) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, new String[0]);
        path = path.queryParam("force", Boolean.toString(force));
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void unload(String topic) throws PulsarAdminException {
        try {
            this.unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> unloadAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "unload");
        return this.asyncPutRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public List<String> getSubscriptions(String topic) throws PulsarAdminException {
        try {
            return this.getSubscriptionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<String>> getSubscriptionsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "subscriptions");
        final CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        this.asyncGetRequest(path, new InvocationCallback<List<String>>(){

            @Override
            public void completed(List<String> response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public TopicStats getStats(String topic) throws PulsarAdminException {
        try {
            return this.getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<TopicStats> getStatsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "stats");
        final CompletableFuture<TopicStats> future = new CompletableFuture<TopicStats>();
        this.asyncGetRequest(path, new InvocationCallback<TopicStats>(){

            @Override
            public void completed(TopicStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
        try {
            return this.getInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "internalStats");
        final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<PersistentTopicInternalStats>();
        this.asyncGetRequest(path, new InvocationCallback<PersistentTopicInternalStats>(){

            @Override
            public void completed(PersistentTopicInternalStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public JsonObject getInternalInfo(String topic) throws PulsarAdminException {
        try {
            return this.getInternalInfoAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<JsonObject> getInternalInfoAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "internal-info");
        final CompletableFuture<JsonObject> future = new CompletableFuture<JsonObject>();
        this.asyncGetRequest(path, new InvocationCallback<String>(){

            @Override
            public void completed(String response) {
                JsonObject json = new Gson().fromJson(response, JsonObject.class);
                future.complete(json);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException {
        try {
            return this.getPartitionedStatsAsync(topic, perPartition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, final boolean perPartition) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitioned-stats");
        path = path.queryParam("perPartition", perPartition);
        final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<PartitionedTopicStats>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicStats>(){

            @Override
            public void completed(PartitionedTopicStats response) {
                if (!perPartition) {
                    response.partitions.clear();
                }
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public PartitionedTopicInternalStats getPartitionedInternalStats(String topic) throws PulsarAdminException {
        try {
            return this.getPartitionedInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "partitioned-internalStats");
        final CompletableFuture<PartitionedTopicInternalStats> future = new CompletableFuture<PartitionedTopicInternalStats>();
        this.asyncGetRequest(path, new InvocationCallback<PartitionedTopicInternalStats>(){

            @Override
            public void completed(PartitionedTopicInternalStats response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
        try {
            this.deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> deleteSubscriptionAsync(String topic, String subName) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
        return this.asyncDeleteRequest(path);
    }

    @Override
    public void skipAllMessages(String topic, String subName) throws PulsarAdminException {
        try {
            this.skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> skipAllMessagesAsync(String topic, String subName) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "skip_all");
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void skipMessages(String topic, String subName, long numMessages) throws PulsarAdminException {
        try {
            this.skipMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> skipMessagesAsync(String topic, String subName, long numMessages) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "skip", String.valueOf(numMessages));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void expireMessages(String topic, String subName, long expireTimeInSeconds) throws PulsarAdminException {
        try {
            this.expireMessagesAsync(topic, subName, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> expireMessagesAsync(String topic, String subName, long expireTimeInSeconds) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "expireMessages", String.valueOf(expireTimeInSeconds));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
        try {
            this.expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<Void> expireMessagesForAllSubscriptionsAsync(String topic, long expireTimeInSeconds) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "all_subscription", "expireMessages", String.valueOf(expireTimeInSeconds));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String topic, String subName, int messagePosition) {
        final TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "position", String.valueOf(messagePosition));
        final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
        this.asyncGetRequest(path, new InvocationCallback<Response>(){

            @Override
            public void completed(Response response) {
                try {
                    future.complete(TopicsImpl.this.getMessageFromHttpResponse(tn.toString(), response));
                }
                catch (Exception e) {
                    future.completeExceptionally(TopicsImpl.this.getApiException(e));
                }
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }

    @Override
    public List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException {
        try {
            return this.peekMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    @Override
    public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
        Preconditions.checkArgument(numMessages > 0);
        CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
        this.peekMessagesAsync(topic, subName, numMessages, Lists.newArrayList(), future, 1);
        return future;
    }

    private void peekMessagesAsync(String topic, String subName, int numMessages, List<Message<byte[]>> messages, CompletableFuture<List<Message<byte[]>>> future, int nthMessage) {
        if (numMessages <= 0) {
            future.complete(messages);
            return;
        }
        this.peekNthMessage(topic, subName, nthMessage).handle((r, ex) -> {
            if (ex != null) {
                if (ex instanceof PulsarAdminException.NotFoundException) {
                    log.warn("Exception '{}' occured while trying to peek Messages.", (Object)ex.getMessage());
                    future.complete(messages);
                } else {
                    future.completeExceptionally((Throwable)ex);
                }
                return null;
            }
            for (int i = 0; i < Math.min(r.size(), numMessages); ++i) {
                messages.add((Message<byte[]>)r.get(i));
            }
            this.peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, nthMessage + 1);
            return null;
        });
    }

    @Override
    public void createSubscription(String topic, String subscriptionName, MessageId messageId) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            String encodedSubName = Codec.encode(subscriptionName);
            WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
            this.request(path).put(Entity.entity(messageId, "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subscriptionName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName);
        return this.asyncPutRequest(path, Entity.entity(messageId, "application/json"));
    }

    @Override
    public void resetCursor(String topic, String subName, long timestamp) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            String encodedSubName = Codec.encode(subName);
            WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor", String.valueOf(timestamp));
            this.request(path).post(Entity.entity("", "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> resetCursorAsync(String topic, String subName, long timestamp) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor", String.valueOf(timestamp));
        return this.asyncPostRequest(path, Entity.entity("", "application/json"));
    }

    @Override
    public void resetCursor(String topic, String subName, MessageId messageId) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            String encodedSubName = Codec.encode(subName);
            WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor");
            this.request(path).post(Entity.entity(messageId, "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId) {
        TopicName tn = this.validateTopic(topic);
        String encodedSubName = Codec.encode(subName);
        WebTarget path = this.topicPath(tn, "subscription", encodedSubName, "resetcursor");
        return this.asyncPostRequest(path, Entity.entity(messageId, "application/json"));
    }

    @Override
    public CompletableFuture<MessageId> terminateTopicAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        try {
            final WebTarget path = this.topicPath(tn, "terminate");
            this.request(path).async().post(Entity.entity("", "application/json"), new InvocationCallback<MessageIdImpl>(){

                @Override
                public void completed(MessageIdImpl messageId) {
                    future.complete(messageId);
                }

                @Override
                public void failed(Throwable throwable) {
                    log.warn("[{}] Failed to perform http post request: {}", (Object)path.getUri(), (Object)throwable.getMessage());
                    future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
                }
            });
        }
        catch (PulsarAdminException cae) {
            future.completeExceptionally(cae);
        }
        return future;
    }

    @Override
    public void triggerCompaction(String topic) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            this.request(this.topicPath(tn, "compaction")).put(Entity.entity("", "application/json"), ErrorData.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            return this.request(this.topicPath(tn, "compaction")).get(LongRunningProcessStatus.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            WebTarget path = this.topicPath(tn, "offload");
            this.request(path).put(Entity.entity(messageId, "application/json"), MessageIdImpl.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    @Override
    public OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException {
        try {
            TopicName tn = this.validateTopic(topic);
            return this.request(this.topicPath(tn, "offload")).get(OffloadProcessStatus.class);
        }
        catch (Exception e) {
            throw this.getApiException(e);
        }
    }

    private WebTarget namespacePath(String domain, NamespaceName namespace, String ... parts) {
        WebTarget base = namespace.isV2() ? this.adminV2Topics : this.adminTopics;
        WebTarget namespacePath = base.path(domain).path(namespace.toString());
        namespacePath = WebTargets.addParts(namespacePath, parts);
        return namespacePath;
    }

    private WebTarget topicPath(TopicName topic, String ... parts) {
        WebTarget base = topic.isV2() ? this.adminV2Topics : this.adminTopics;
        WebTarget topicPath = base.path(topic.getRestPath());
        topicPath = WebTargets.addParts(topicPath, parts);
        return topicPath;
    }

    private TopicName validateTopic(String topic) {
        return TopicName.get(topic);
    }

    private List<Message<byte[]>> getMessageFromHttpResponse(String topic, Response response) throws Exception {
        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
            throw this.getApiException(response);
        }
        String msgId = response.getHeaderString("X-Pulsar-Message-ID");
        try (InputStream stream = (InputStream)response.getEntity();){
            byte[] data = new byte[stream.available()];
            stream.read(data);
            TreeMap<String, String> properties = Maps.newTreeMap();
            MultivaluedMap<String, Object> headers = response.getHeaders();
            Object tmp = headers.getFirst("X-Pulsar-publish-time");
            if (tmp != null) {
                properties.put("publish-time", (String)tmp);
            }
            tmp = headers.getFirst("X-Pulsar-num-batch-message");
            if (response.getHeaderString("X-Pulsar-num-batch-message") != null) {
                properties.put("X-Pulsar-num-batch-message", (String)tmp);
                List<Message<byte[]>> list = this.getIndividualMsgsFromBatch(topic, msgId, data, properties);
                return list;
            }
            for (Map.Entry entry : headers.entrySet()) {
                String header = (String)entry.getKey();
                if (!header.contains("X-Pulsar-PROPERTY-")) continue;
                String keyName = header.substring("X-Pulsar-PROPERTY-".length(), header.length());
                properties.put(keyName, (String)((List)entry.getValue()).get(0));
            }
            List list = Collections.singletonList(new MessageImpl(topic, msgId, properties, Unpooled.wrappedBuffer(data), Schema.BYTES));
            return list;
        }
    }

    private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, Map<String, String> properties) {
        ArrayList<Message<byte[]>> ret = new ArrayList<Message<byte[]>>();
        int batchSize = Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
        for (int i = 0; i < batchSize; ++i) {
            String batchMsgId = msgId + ":" + i;
            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
            ByteBuf buf = Unpooled.wrappedBuffer(data);
            try {
                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i, batchSize);
                PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
                if (singleMessageMetadata.getPropertiesCount() > 0) {
                    for (PulsarApi.KeyValue entry : singleMessageMetadata.getPropertiesList()) {
                        properties.put(entry.getKey(), entry.getValue());
                    }
                }
                ret.add(new MessageImpl(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
            }
            catch (Exception ex) {
                log.error("Exception occured while trying to get BatchMsgId: {}", (Object)batchMsgId, (Object)ex);
            }
            buf.release();
            singleMessageMetadataBuilder.recycle();
        }
        return ret;
    }

    @Override
    public MessageId getLastMessageId(String topic) throws PulsarAdminException {
        try {
            return this.getLastMessageIdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (PulsarAdminException)e.getCause();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarAdminException(e);
        }
        catch (TimeoutException e) {
            throw new PulsarAdminException.TimeoutException(e);
        }
    }

    public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
        TopicName tn = this.validateTopic(topic);
        WebTarget path = this.topicPath(tn, "lastMessageId");
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.asyncGetRequest(path, new InvocationCallback<MessageIdImpl>(){

            @Override
            public void completed(MessageIdImpl response) {
                future.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                future.completeExceptionally(TopicsImpl.this.getApiException(throwable.getCause()));
            }
        });
        return future;
    }
}

