/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v2;

import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/non-persistent")
@Produces(value={"application/json"})
@Api(value="/non-persistent", description="Non-Persistent topic admin apis", tags={"non-persistent topic"})
public class NonPersistentTopics
extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override
    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate cluster configuration")})
    public void getPartitionedMetadata(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Is check configuration required to automatically create topic") @QueryParam(value="checkAllowAutoCreation") @DefaultValue(value="false") boolean checkAllowAutoCreation) {
        super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative, checkAllowAutoCreation);
    }

    @Override
    @GET
    @Path(value="{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public void getInternalStats(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="metadata") @DefaultValue(value="false") boolean metadata) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.GET_STATS))).thenCompose(__ -> {
            Topic topic = this.getTopicReference(this.topicName);
            boolean includeMetadata = metadata && this.hasSuperUserAccess();
            return topic.getInternalStats(includeMetadata);
        })).thenAccept(arg_0 -> ((AsyncResponse)asyncResponse).resume(arg_0))).exceptionally(ex -> {
            if (!NonPersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to get internal stats for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @Override
    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace does not exist"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic already exists"), @ApiResponse(code=412, message="Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="The number of partitions for the topic", required=true, type="int", defaultValue="0") int numPartitions, @QueryParam(value="createLocalTopicOnly") @DefaultValue(value="false") boolean createLocalTopicOnly) {
        try {
            this.validateNamespaceName(tenant, namespace);
            this.validateGlobalNamespaceOwnership();
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @Override
    @GET
    @Path(value="{tenant}/{namespace}/{topic}/partitioned-stats")
    @ApiOperation(value="Get the stats for the partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to administrate resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist"), @ApiResponse(code=412, message="Partitioned topic name is invalid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getPartitionedStats(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Get per partition stats") @QueryParam(value="perPartition") @DefaultValue(value="true") boolean perPartition, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="If return precise backlog or imprecise backlog") @QueryParam(value="getPreciseBacklog") @DefaultValue(value="false") boolean getPreciseBacklog, @ApiParam(value="If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @QueryParam(value="subscriptionBacklogSize") @DefaultValue(value="false") boolean subscriptionBacklogSize, @ApiParam(value="If return the earliest time in backlog") @QueryParam(value="getEarliestTimeInBacklog") @DefaultValue(value="false") boolean getEarliestTimeInBacklog) {
        try {
            this.validatePartitionedTopicName(tenant, namespace, encodedTopic);
            if (this.topicName.isGlobal()) {
                try {
                    this.validateGlobalNamespaceOwnership(this.namespaceName);
                }
                catch (Exception e) {
                    log.error("[{}] Failed to get partitioned stats for {}", new Object[]{this.clientAppId(), this.topicName, e});
                    NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
                    return;
                }
            }
            ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                if (partitionMetadata.partitions == 0) {
                    asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, String.format("Partitioned topic not found %s", this.topicName.toString()))));
                    return;
                }
                NonPersistentPartitionedTopicStatsImpl stats = new NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
                ArrayList topicStatsFutureList = Lists.newArrayList();
                for (int i = 0; i < partitionMetadata.partitions; ++i) {
                    try {
                        topicStatsFutureList.add(this.pulsar().getAdminClient().topics().getStatsAsync(this.topicName.getPartition(i).toString(), getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog));
                        continue;
                    }
                    catch (PulsarServerException e) {
                        asyncResponse.resume((Throwable)((Object)new RestException(e)));
                        return;
                    }
                }
                FutureUtil.waitForAll((Collection)topicStatsFutureList).handle((result, exception) -> {
                    CompletableFuture statFuture = null;
                    for (int i = 0; i < topicStatsFutureList.size(); ++i) {
                        statFuture = (CompletableFuture)topicStatsFutureList.get(i);
                        if (!statFuture.isDone() || statFuture.isCompletedExceptionally()) continue;
                        try {
                            stats.add((NonPersistentTopicStats)((NonPersistentTopicStatsImpl)statFuture.get()));
                            if (!perPartition) continue;
                            stats.getPartitions().put(this.topicName.getPartition(i).toString(), (NonPersistentTopicStatsImpl)statFuture.get());
                            continue;
                        }
                        catch (Exception e) {
                            asyncResponse.resume((Throwable)((Object)new RestException(e)));
                            return null;
                        }
                    }
                    if (perPartition && stats.partitions.isEmpty()) {
                        try {
                            boolean topicExists = this.namespaceResources().getPartitionedTopicResources().partitionedTopicExists(this.topicName);
                            if (!topicExists) {
                                asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet")));
                                return null;
                            }
                            stats.getPartitions().put(this.topicName.toString(), new NonPersistentTopicStatsImpl());
                        }
                        catch (Exception e) {
                            asyncResponse.resume((Throwable)((Object)new RestException(e)));
                            return null;
                        }
                    }
                    asyncResponse.resume((Object)stats);
                    return null;
                });
            })).exceptionally(ex -> {
                log.error("[{}] Failed to get partitioned stats for {}", new Object[]{this.clientAppId(), this.topicName, ex});
                NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @Override
    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation(value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="This operation requires super-user access"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalUnloadTopic(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @Override
    @GET
    @Path(value="/{tenant}/{namespace}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace does not exist"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getList(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify the bundle name", required=false) @QueryParam(value="bundle") String nsBundle, @ApiParam(value="Include system topic") @QueryParam(value="includeSystemTopic") boolean includeSystemTopic) {
        Policies policies = null;
        try {
            this.validateNamespaceName(tenant, namespace);
            if (log.isDebugEnabled()) {
                log.debug("[{}] list of topics on namespace {}", (Object)this.clientAppId(), (Object)this.namespaceName);
            }
            this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_TOPICS);
            policies = this.getNamespacePolicies(this.namespaceName);
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
            return;
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
            return;
        }
        ArrayList futures = Lists.newArrayList();
        List boundaries = policies.bundles.getBoundaries();
        for (int i = 0; i < boundaries.size() - 1; ++i) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            if (StringUtils.isNotBlank((CharSequence)nsBundle) && !nsBundle.equals(bundle)) continue;
            try {
                futures.add(this.pulsar().getAdminClient().topics().getListInBundleAsync(this.namespaceName.toString(), bundle));
                continue;
            }
            catch (PulsarServerException e) {
                log.error("[{}] Failed to get list of topics under namespace {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundle, e});
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
        FutureUtil.waitForAll((Collection)futures).whenComplete((result, ex) -> {
            if (ex != null) {
                NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            } else {
                ArrayList topics = Lists.newArrayList();
                for (int i = 0; i < futures.size(); ++i) {
                    List topicList = (List)((CompletableFuture)futures.get(i)).join();
                    if (topicList == null) continue;
                    topics.addAll(topicList);
                }
                List<String> nonPersistentTopics = topics.stream().filter(name -> !TopicName.get((String)name).isPersistent()).collect(Collectors.toList());
                asyncResponse.resume(this.filterSystemTopic(nonPersistentTopics, includeSystemTopic));
            }
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{bundle}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace bundle.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getListFromBundle(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Bundle range of a topic", required=true) @PathParam(value="bundle") String bundleRange) {
        this.validateNamespaceName(tenant, namespace);
        if (log.isDebugEnabled()) {
            log.debug("[{}] list of topics on namespace bundle {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange});
        }
        this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_BUNDLE);
        Policies policies = this.getNamespacePolicies(this.namespaceName);
        this.validateGlobalNamespaceOwnership(this.namespaceName);
        ((CompletableFuture)this.isBundleOwnedByAnyBroker(this.namespaceName, policies.bundles, bundleRange).thenAccept(flag -> {
            if (!flag.booleanValue()) {
                log.info("[{}] Namespace bundle is not owned by any broker {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange});
                asyncResponse.resume((Object)Response.noContent().build());
            } else {
                ((CompletableFuture)this.validateNamespaceBundleOwnershipAsync(this.namespaceName, policies.bundles, bundleRange, true, true).thenAccept(nsBundle -> {
                    ConcurrentOpenHashMap bundleTopics = (ConcurrentOpenHashMap)this.pulsar().getBrokerService().getMultiLayerTopicsMap().get((Object)this.namespaceName.toString());
                    if (bundleTopics == null || bundleTopics.isEmpty()) {
                        asyncResponse.resume(Collections.emptyList());
                        return;
                    }
                    ArrayList topicList = new ArrayList();
                    String bundleKey = this.namespaceName.toString() + "/" + nsBundle.getBundleRange();
                    ConcurrentOpenHashMap topicMap = (ConcurrentOpenHashMap)bundleTopics.get((Object)bundleKey);
                    if (topicMap != null) {
                        topicList.addAll(topicMap.keys().stream().filter(name -> !TopicName.get((String)name).isPersistent()).collect(Collectors.toList()));
                    }
                    asyncResponse.resume(topicList);
                })).exceptionally(ex -> {
                    if (!NonPersistentTopics.isNot307And404Exception(ex)) {
                        log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange, ex});
                    }
                    NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!NonPersistentTopics.isNot307And404Exception(ex)) {
                log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange, ex});
            }
            NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @Override
    @DELETE
    @Path(value="/{tenant}/{namespace}/{topic}/truncate")
    @ApiOperation(value="Truncate a topic.", notes="NonPersistentTopic does not support truncate.")
    @ApiResponses(value={@ApiResponse(code=412, message="NonPersistentTopic does not support truncate.")})
    public void truncateTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Whether leader broker redirected this call to this broker. For internal use.") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        asyncResponse.resume((Throwable)((Object)new RestException(Response.Status.PRECONDITION_FAILED.getStatusCode(), "unsupport truncate")));
    }

    protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
        this.validateAdminAccessForTenant(topicName.getTenant());
        this.validateTopicOwnership(topicName, authoritative);
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(this.config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Topic not found"));
        }
        catch (ExecutionException e) {
            throw new RestException(e.getCause());
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RestException(e);
        }
    }
}

