/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/brokers")
@Api(value="/brokers", description="Brokers admin apis", tags={"brokers"})
@Produces(value={"application/json"})
public class Brokers
extends AdminResource {
    private static final Logger LOG = LoggerFactory.getLogger(Brokers.class);
    private int serviceConfigZkVersion = -1;

    @GET
    @Path(value="/{cluster}")
    @ApiOperation(value="Get the list of active brokers (web service addresses) in the cluster.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist")})
    public Set<String> getActiveBrokers(@PathParam(value="cluster") String cluster) throws Exception {
        this.validateSuperUserAccess();
        this.validateClusterOwnership(cluster);
        try {
            return this.pulsar().getLocalZkCache().getChildren("/loadbalance/brokers");
        }
        catch (Exception e) {
            LOG.error(String.format("[%s] Failed to get active broker list: cluster=%s", this.clientAppId(), cluster), (Throwable)e);
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{cluster}/{broker}/ownedNamespaces")
    @ApiOperation(value="Get the list of namespaces served by the specific broker", response=NamespaceOwnershipStatus.class, responseContainer="Map")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist")})
    public Map<String, NamespaceOwnershipStatus> getOwnedNamespaes(@PathParam(value="cluster") String cluster, @PathParam(value="broker") String broker) throws Exception {
        this.validateSuperUserAccess();
        this.validateClusterOwnership(cluster);
        this.validateBrokerName(broker);
        try {
            return this.pulsar().getNamespaceService().getOwnedNameSpacesStatus();
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", new Object[]{this.clientAppId(), cluster, broker});
            throw new RestException(e);
        }
    }

    @POST
    @Path(value="/configuration/{configName}/{configValue}")
    @ApiOperation(value="Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=204, message="Service configuration updated successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=404, message="Configuration not found"), @ApiResponse(code=412, message="Configuration can't be updated dynamically")})
    public void updateDynamicConfiguration(@PathParam(value="configName") String configName, @PathParam(value="configValue") String configValue) throws Exception {
        this.validateSuperUserAccess();
        this.updateDynamicConfigurationOnZk(configName, configValue);
    }

    @GET
    @Path(value="/configuration/values")
    @ApiOperation(value="Get value of all dynamic configurations' value overridden on local config")
    @ApiResponses(value={@ApiResponse(code=404, message="Configuration not found")})
    public Map<String, String> getAllDynamicConfigurations() throws Exception {
        ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = this.pulsar().getBrokerService().getDynamicConfigurationCache();
        Map configurationMap = null;
        try {
            configurationMap = (Map)dynamicConfigurationCache.get("/admin/configuration").orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Couldn't find configuration in zk"));
        }
        catch (RestException e) {
            LOG.error("[{}] couldn't find any configuration in zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw e;
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to retrieve configuration from zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw new RestException(e);
        }
        return configurationMap;
    }

    @GET
    @Path(value="/configuration")
    @ApiOperation(value="Get all updatable dynamic configurations's name")
    public List<String> getDynamicConfigurationName() {
        return BrokerService.getDynamicConfigurationMap().keys();
    }

    private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
        try {
            if (BrokerService.getDynamicConfigurationMap().containsKey((Object)configName)) {
                ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = this.pulsar().getBrokerService().getDynamicConfigurationCache();
                Map configurationMap = dynamicConfigurationCache.get("/admin/configuration").orElse(null);
                if (configurationMap != null) {
                    configurationMap.put(configName, configValue);
                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)configurationMap);
                    dynamicConfigurationCache.invalidate("/admin/configuration");
                    this.serviceConfigZkVersion = this.localZk().setData("/admin/configuration", content, this.serviceConfigZkVersion).getVersion();
                } else {
                    configurationMap = Maps.newHashMap();
                    configurationMap.put(configName, configValue);
                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)configurationMap);
                    ZkUtils.createFullPathOptimistic((ZooKeeper)this.localZk(), (String)"/admin/configuration", (byte[])content, ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue});
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
            }
            LOG.info("[{}] Updated Service configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue});
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception ie) {
            LOG.error("[{}] Failed to update configuration {}/{}, {}", new Object[]{this.clientAppId(), configName, configValue, ie.getMessage(), ie});
            throw new RestException(ie);
        }
    }
}

