/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.PersistentTopics;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/non-persistent")
@Produces(value={"application/json"})
@Api(value="/non-persistent", description="Non-Persistent topic admin apis", tags={"non-persistent topic"})
public class NonPersistentTopics
extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override
    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        return this.getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
    }

    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/stats")
    @ApiOperation(value="Get the stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public NonPersistentTopicStats getStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        return ((NonPersistentTopic)topic).getStats();
    }

    @Override
    @GET
    @Path(value="{property}/{cluster}/{namespace}/{destination}/internalStats")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public PersistentTopicInternalStats getInternalStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminOperationOnDestination(dn, authoritative);
        Topic topic = this.getTopicReference(dn);
        return topic.getInternalStats();
    }

    @Override
    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=409, message="Partitioned topic already exist")})
    public void createPartitionedTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, int numPartitions, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        this.validateAdminAccessOnProperty(dn.getProperty());
        if (numPartitions <= 1) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
        }
        try {
            String path = NonPersistentTopics.path("partitioned-topics", property, cluster, namespace, this.domain(), dn.getEncodedLocalName());
            byte[] data = NonPersistentTopics.jsonMapper().writeValueAsBytes((Object)new PartitionedTopicMetadata(numPartitions));
            this.zkCreateOptimistic(path, data);
            Thread.sleep(1000L);
            log.info("[{}] Successfully created partitioned topic {}", (Object)this.clientAppId(), (Object)dn);
        }
        catch (KeeperException.NodeExistsException nodeExistsException) {
            log.warn("[{}] Failed to create already existing partitioned topic {}", (Object)this.clientAppId(), (Object)dn);
            throw new RestException(Response.Status.CONFLICT, "Partitioned topic already exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), dn, e});
            throw new RestException(e);
        }
    }

    @Override
    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{destination}/unload")
    @ApiOperation(value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public void unloadTopic(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="destination") @Encoded String destination, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        log.info("[{}] Unloading topic {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, destination});
        destination = Codec.decode((String)destination);
        DestinationName dn = DestinationName.get((String)this.domain(), (String)property, (String)cluster, (String)namespace, (String)destination);
        if (cluster.equals("global")) {
            this.validateGlobalNamespaceOwnership(NamespaceName.get((String)property, (String)cluster, (String)namespace));
        }
        this.unloadTopic(dn, authoritative);
    }

    @Override
    @GET
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public List<String> getList(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace) {
        log.info("[{}] list of topics on namespace {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        NamespaceName nsName = NamespaceName.get((String)property, (String)cluster, (String)namespace);
        if (!cluster.equals("global")) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        } else {
            this.validateGlobalNamespaceOwnership(nsName);
        }
        ArrayList futures = Lists.newArrayList();
        List boundaries = policies.bundles.getBoundaries();
        int i = 0;
        while (i < boundaries.size() - 1) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            try {
                futures.add(this.pulsar().getAdminClient().nonPersistentTopics().getListInBundleAsync(nsName.toString(), bundle));
            }
            catch (PulsarServerException e) {
                log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s/%s", this.clientAppId(), property, cluster, namespace, bundle), (Throwable)e);
                throw new RestException(e);
            }
            ++i;
        }
        ArrayList topics = Lists.newArrayList();
        try {
            FutureUtil.waitForAll((List)futures).get();
            futures.forEach(topicListFuture -> {
                try {
                    if (topicListFuture.isDone() && topicListFuture.get() != null) {
                        topics.addAll((Collection)topicListFuture.get());
                    }
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", this.clientAppId(), property, cluster, namespace), (Throwable)e);
                }
            });
        }
        catch (InterruptedException | ExecutionException e) {
            log.error(String.format("[%s] Failed to get list of topics under namespace %s/%s/%s", this.clientAppId(), property, cluster, namespace), (Throwable)e);
            throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
        }
        return topics;
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace bundle.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public List<String> getListFromBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange) {
        log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
        this.validateAdminAccessOnProperty(property);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals("global")) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForProperty(property, cluster);
        } else {
            this.validateGlobalNamespaceOwnership(NamespaceName.get((String)property, (String)cluster, (String)namespace));
        }
        NamespaceName fqnn = NamespaceName.get((String)property, (String)cluster, (String)namespace);
        if (!this.isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)) {
            log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
            return null;
        }
        NamespaceBundle nsBundle = this.validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
        try {
            ArrayList topicList = Lists.newArrayList();
            this.pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
                DestinationName topicName = DestinationName.get((String)name);
                if (nsBundle.includes(topicName)) {
                    topicList.add(name);
                }
            });
            return topicList;
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    @Override
    protected void validateAdminOperationOnDestination(DestinationName fqdn, boolean authoritative) {
        this.validateAdminAccessOnProperty(fqdn.getProperty());
        this.validateDestinationOwnership(fqdn, authoritative);
    }

    private Topic getTopicReference(DestinationName dn) {
        try {
            Topic topic = this.pulsar().getBrokerService().getTopicReference(dn.toString());
            Preconditions.checkNotNull((Object)topic);
            return topic;
        }
        catch (Exception exception) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
    }
}

