package org.apache.pulsar.broker.admin.impl;

import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/ClustersBase.class */
public class ClustersBase extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Return a list of clusters."), @ApiResponse(code = 500, message = "Internal server error.")})
    @ApiOperation(value = "Get the list of all the Pulsar clusters.", response = String.class, responseContainer = "Set")
    public Set<String> getClusters() throws Exception {
        try {
            return (Set) clustersListCache().get().stream().filter(str -> {
                return !"global".equals(str);
            }).collect(Collectors.toSet());
        } catch (Exception e) {
            log.error("[{}] Failed to get clusters list", clientAppId(), e);
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterData.class), @ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Get the configuration for the specified cluster.", response = ClusterData.class, notes = "This operation requires Pulsar superuser privileges.")
    public ClusterData getCluster(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateSuperUserAccess();
        try {
            return (ClusterData) clustersCache().get(path("clusters", str)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            });
        } catch (Exception e) {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, e});
            if (e instanceof RestException) {
                throw ((RestException) e);
            }
            throw new RestException((Throwable) e);
        }
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been created."), @ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."), @ApiResponse(code = 409, message = "Cluster already exists."), @ApiResponse(code = 412, message = "Cluster name is not valid."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Create a new cluster.", notes = "This operation requires Pulsar superuser privileges, and the name cannot contain the '/' characters.")
    @PUT
    public void createCluster(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   'serviceUrl': 'http://pulsar.example.com:8080',\n   'brokerServiceUrl': 'pulsar://pulsar.example.com:6651',\n}")})) ClusterData clusterData) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        try {
            NamedEntity.checkName(str);
            zkCreate(path("clusters", str), jsonMapper().writeValueAsBytes(clusterData));
            log.info("[{}] Created cluster {}", clientAppId(), str);
        } catch (Exception e) {
            log.error("[{}] Failed to create cluster {}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        } catch (KeeperException.NodeExistsException e2) {
            log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), str);
            throw new RestException(Response.Status.CONFLICT, "Cluster already exists");
        } catch (IllegalArgumentException e3) {
            log.warn("[{}] Failed to create cluster with invalid name {}", new Object[]{clientAppId(), str, e3});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster name is not valid");
        }
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been updated."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Update the configuration for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void updateCluster(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   'serviceUrl': 'http://pulsar.example.com:8080',\n   'brokerServiceUrl': 'pulsar://pulsar.example.com:6651'\n}")})) ClusterData clusterData) {
        ClusterData clusterData2;
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        try {
            String path = path("clusters", str);
            Stat stat = new Stat();
            byte[] data = globalZk().getData(path, (Watcher) null, stat);
            if (data.length > 0) {
                clusterData2 = (ClusterData) jsonMapper().readValue(data, ClusterData.class);
                clusterData2.update(clusterData);
            } else {
                clusterData2 = clusterData;
            }
            globalZk().setData(path, jsonMapper().writeValueAsBytes(clusterData2), stat.getVersion());
            globalZkCache().invalidate(path);
            log.info("[{}] Updated cluster {}", clientAppId(), str);
        } catch (Exception e) {
            log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        }
    }

    /* JADX WARN: Type inference failed for: r12v1, types: [java.lang.Throwable, org.apache.pulsar.broker.web.RestException] */
    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been updated."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Peer cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/peers")
    @ApiOperation(value = "Update peer-cluster-list for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setPeerClusterNames(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The list of peer cluster names", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "[\n   'cluster-a',\n   'cluster-b'\n]")})) LinkedHashSet<String> linkedHashSet) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        if (linkedHashSet != null && !linkedHashSet.isEmpty()) {
            Iterator<String> it = linkedHashSet.iterator();
            while (it.hasNext()) {
                String next = it.next();
                try {
                    if (str.equalsIgnoreCase(next)) {
                        throw new RestException(Response.Status.PRECONDITION_FAILED, str + " itself can't be part of peer-list");
                    }
                    clustersCache().get(path("clusters", next)).orElseThrow(() -> {
                        return new RestException(Response.Status.PRECONDITION_FAILED, "Peer cluster " + next + " does not exist");
                    });
                } catch (Exception e) {
                    log.warn("[{}] Failed to validate peer-cluster list {}, {}", new Object[]{clientAppId(), linkedHashSet, e.getMessage()});
                    throw new RestException(e);
                } catch (RestException e2) {
                    log.warn("[{}] Peer cluster doesn't exist from {}, {}", new Object[]{clientAppId(), linkedHashSet, e2.getMessage()});
                    throw e2;
                }
            }
        }
        try {
            String path = path("clusters", str);
            Stat stat = new Stat();
            ClusterData clusterData = (ClusterData) jsonMapper().readValue(globalZk().getData(path, (Watcher) null, stat), ClusterData.class);
            clusterData.setPeerClusterNames(linkedHashSet);
            globalZk().setData(path, jsonMapper().writeValueAsBytes(clusterData), stat.getVersion());
            globalZkCache().invalidate(path);
            log.info("[{}] Successfully added peer-cluster {} for {}", new Object[]{clientAppId(), linkedHashSet, str});
        } catch (KeeperException.NoNodeException e3) {
            log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        } catch (Exception e4) {
            log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, e4});
            throw new RestException(e4);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/peers")
    @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = String.class, responseContainer = "Set", notes = "This operation requires Pulsar superuser privileges.")
    public Set<String> getPeerCluster(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateSuperUserAccess();
        try {
            return ((ClusterData) jsonMapper().readValue(globalZk().getData(path("clusters", str), (Watcher) null, (Stat) null), ClusterData.class)).getPeerClusterNames();
        } catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been deleted."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Cluster is not empty."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @DELETE
    @ApiOperation(value = "Delete an existing cluster.", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteCluster(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        boolean z = false;
        try {
            Iterator it = globalZk().getChildren(path(ZkAdminPaths.POLICIES), false).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                String str2 = (String) it.next();
                if (globalZk().exists(path(ZkAdminPaths.POLICIES, str2, str), false) != null && !globalZk().getChildren(path(ZkAdminPaths.POLICIES, str2, str), false).isEmpty()) {
                    z = true;
                    break;
                }
            }
            String path = path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES);
            Optional optional = namespaceIsolationPoliciesCache().get(path);
            if (optional.isPresent()) {
                if (((NamespaceIsolationPolicies) optional.get()).getPolicies().isEmpty()) {
                    globalZk().delete(path, -1);
                    namespaceIsolationPoliciesCache().invalidate(path);
                } else {
                    z = true;
                }
            }
            if (z) {
                log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), str);
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster not empty");
            }
            try {
                String path2 = path("clusters", str);
                deleteFailureDomain(path2);
                globalZk().delete(path2, -1);
                globalZkCache().invalidate(path2);
                log.info("[{}] Deleted cluster {}", clientAppId(), str);
            } catch (Exception e) {
                log.error("[{}] Failed to delete cluster {}", new Object[]{clientAppId(), str, e});
                throw new RestException(e);
            } catch (KeeperException.NoNodeException e2) {
                log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), str);
                throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            }
        } catch (Exception e3) {
            log.error("[{}] Failed to get cluster usage {}", new Object[]{clientAppId(), str, e3});
            throw new RestException(e3);
        }
    }

    private void deleteFailureDomain(String str) {
        try {
            String joinPath = joinPath(str, "failureDomain");
            if (globalZk().exists(joinPath, false) == null) {
                return;
            }
            Iterator it = globalZk().getChildren(joinPath, false).iterator();
            while (it.hasNext()) {
                globalZk().delete(joinPath(joinPath, (String) it.next()), -1);
            }
            globalZk().delete(joinPath, -1);
            failureDomainCache().clear();
            failureDomainListCache().clear();
        } catch (Exception e) {
            log.warn("Failed to delete failure-domain under cluster {}", str);
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies")
    @ApiOperation(value = "Get the namespace isolation policies assigned to the cluster.", response = NamespaceIsolationData.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) throws Exception {
        validateSuperUserAccess();
        if (!clustersCache().get(path("clusters", str)).isPresent()) {
            throw new RestException(Response.Status.NOT_FOUND, "Cluster " + str + " does not exist.");
        }
        try {
            return ((NamespaceIsolationPolicies) namespaceIsolationPoliciesCache().get(path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
            })).getPolicies();
        } catch (Exception e) {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @ApiOperation(value = "Get the single namespace isolation policy assigned to the cluster.", response = NamespaceIsolationData.class, notes = "This operation requires Pulsar superuser privileges.")
    public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The name of the namespace isolation policy", required = true) String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            NamespaceIsolationPolicies namespaceIsolationPolicies = (NamespaceIsolationPolicies) namespaceIsolationPoliciesCache().get(path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
            });
            if (namespaceIsolationPolicies.getPolicies().containsKey(str2)) {
                return (NamespaceIsolationData) namespaceIsolationPolicies.getPolicies().get(str2);
            }
            log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", new Object[]{clientAppId(), str2, str});
            throw new RestException(Response.Status.NOT_FOUND, "Cannot find NamespaceIsolationPolicy " + str2 + " for cluster " + str);
        } catch (Exception e) {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        } catch (RestException e2) {
            throw e2;
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/brokers")
    @ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them.", response = BrokerNamespaceIsolationData.class, responseContainer = "set", notes = "This operation requires Pulsar superuser privileges.")
    public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateSuperUserAccess();
        validateClusterExists(str);
        String path = AdminResource.path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES);
        try {
            Set<String> availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
            try {
                Optional optional = namespaceIsolationPoliciesCache().get(path);
                if (!optional.isPresent()) {
                    throw new RestException(Response.Status.NOT_FOUND, "namespace-isolation policies not found for " + str);
                }
                Map policies = ((NamespaceIsolationPolicies) optional.get()).getPolicies();
                return (List) availableBrokers.stream().map(str2 -> {
                    BrokerNamespaceIsolationData brokerNamespaceIsolationData = new BrokerNamespaceIsolationData();
                    brokerNamespaceIsolationData.brokerName = str2;
                    if (policies != null) {
                        policies.forEach((str2, namespaceIsolationData) -> {
                            NamespaceIsolationPolicyImpl namespaceIsolationPolicyImpl = new NamespaceIsolationPolicyImpl(namespaceIsolationData);
                            if (namespaceIsolationPolicyImpl.isPrimaryBroker(str2) || namespaceIsolationPolicyImpl.isSecondaryBroker(str2)) {
                                if (brokerNamespaceIsolationData.namespaceRegex == null) {
                                    brokerNamespaceIsolationData.namespaceRegex = Lists.newArrayList();
                                }
                                brokerNamespaceIsolationData.namespaceRegex.addAll(namespaceIsolationData.namespaces);
                            }
                        });
                    }
                    return brokerNamespaceIsolationData;
                }).collect(Collectors.toList());
            } catch (Exception e) {
                log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, e});
                throw new RestException(e);
            }
        } catch (Exception e2) {
            log.error("[{}] Failed to get list of brokers in cluster {}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
    @ApiOperation(value = "Get a broker with namespace-isolation policies attached to it.", response = BrokerNamespaceIsolationData.class, notes = "This operation requires Pulsar superuser privileges.")
    public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("broker") @ApiParam(value = "The broker name (<broker-hostname>:<web-service-port>)", required = true, example = "broker1:8080") String str2) {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            Optional optional = namespaceIsolationPoliciesCache().get(AdminResource.path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES));
            if (!optional.isPresent()) {
                throw new RestException(Response.Status.NOT_FOUND, "namespace-isolation policies not found for " + str);
            }
            Map policies = ((NamespaceIsolationPolicies) optional.get()).getPolicies();
            BrokerNamespaceIsolationData brokerNamespaceIsolationData = new BrokerNamespaceIsolationData();
            brokerNamespaceIsolationData.brokerName = str2;
            if (policies != null) {
                policies.forEach((str3, namespaceIsolationData) -> {
                    NamespaceIsolationPolicyImpl namespaceIsolationPolicyImpl = new NamespaceIsolationPolicyImpl(namespaceIsolationData);
                    boolean isPrimaryBroker = namespaceIsolationPolicyImpl.isPrimaryBroker(str2);
                    if (isPrimaryBroker || namespaceIsolationPolicyImpl.isSecondaryBroker(str2)) {
                        if (brokerNamespaceIsolationData.namespaceRegex == null) {
                            brokerNamespaceIsolationData.namespaceRegex = Lists.newArrayList();
                        }
                        brokerNamespaceIsolationData.namespaceRegex.addAll(namespaceIsolationData.namespaces);
                        brokerNamespaceIsolationData.isPrimary = isPrimaryBroker;
                        brokerNamespaceIsolationData.policyName = str3;
                    }
                });
            }
            return brokerNamespaceIsolationData;
        } catch (Exception e) {
            log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Namespace isolation policy data is invalid."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @ApiOperation(value = "Set namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setNamespaceIsolationPolicy(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The namespace isolation policy name", required = true) String str2, @ApiParam(value = "The namespace isolation policy data", required = true) NamespaceIsolationData namespaceIsolationData) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        validatePoliciesReadOnlyAccess();
        try {
            namespaceIsolationData.validate();
            String path = path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES);
            NamespaceIsolationPolicies namespaceIsolationPolicies = (NamespaceIsolationPolicies) namespaceIsolationPoliciesCache().get(path).orElseGet(() -> {
                try {
                    createZnodeIfNotExist(path, Optional.of(Collections.emptyMap()));
                    return new NamespaceIsolationPolicies();
                } catch (KeeperException | InterruptedException e) {
                    throw new RestException((Throwable) e);
                }
            });
            namespaceIsolationPolicies.setPolicy(str2, namespaceIsolationData);
            globalZk().setData(path, jsonMapper().writeValueAsBytes(namespaceIsolationPolicies.getPolicies()), -1);
            namespaceIsolationPoliciesCache().invalidate(path);
        } catch (IllegalArgumentException e) {
            log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid", new Object[]{clientAppId(), str, str2, e});
            throw new RestException(Response.Status.BAD_REQUEST, "Invalid format of input policy data. policy: " + str2 + "; data: " + ObjectMapperFactory.create().writeValueAsString(namespaceIsolationData));
        } catch (Exception e2) {
            log.error("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, str2, e2});
            throw new RestException(e2);
        } catch (KeeperException.NoNodeException e3) {
            log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
        }
    }

    private boolean createZnodeIfNotExist(String str, Optional<Object> optional) throws KeeperException, InterruptedException {
        if (globalZk().exists(str, false) != null) {
            return false;
        }
        try {
            ZkUtils.createFullPathOptimistic(globalZk(), str, optional.isPresent() ? jsonMapper().writeValueAsBytes(optional.get()) : null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return true;
        } catch (JsonMappingException e) {
            return false;
        } catch (JsonGenerationException e2) {
            return false;
        } catch (KeeperException.NodeExistsException e3) {
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Other broker preempted the full path [{}] already. Continue...", str);
            return false;
        } catch (IOException e4) {
            return false;
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission or policies are read only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @DELETE
    @ApiOperation(value = "Delete namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteNamespaceIsolationPolicy(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The namespace isolation policy name", required = true) String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        validatePoliciesReadOnlyAccess();
        try {
            String path = path("clusters", str, NamespaceService.NAMESPACE_ISOLATION_POLICIES);
            NamespaceIsolationPolicies namespaceIsolationPolicies = (NamespaceIsolationPolicies) namespaceIsolationPoliciesCache().get(path).orElseGet(() -> {
                try {
                    createZnodeIfNotExist(path, Optional.of(Collections.emptyMap()));
                    return new NamespaceIsolationPolicies();
                } catch (KeeperException | InterruptedException e) {
                    throw new RestException((Throwable) e);
                }
            });
            namespaceIsolationPolicies.deletePolicy(str2);
            globalZk().setData(path, jsonMapper().writeValueAsBytes(namespaceIsolationPolicies.getPolicies()), -1);
            namespaceIsolationPoliciesCache().invalidate(path);
        } catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, str2, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Failure domain doesn't exist."), @ApiResponse(code = 409, message = "Broker already exists in another domain."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @ApiOperation(value = "Set the failure domain of the cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setFailureDomain(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2, @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomain failureDomain) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        validateBrokerExistsInOtherDomain(str, str2, failureDomain);
        try {
            String joinPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, str2);
            if (createZnodeIfNotExist(joinPath, Optional.ofNullable(failureDomain))) {
                failureDomainListCache().clear();
            } else {
                globalZk().setData(joinPath, jsonMapper().writeValueAsBytes(failureDomain), -1);
                failureDomainCache().invalidate(joinPath);
            }
        } catch (Exception e) {
            log.error("[{}] Failed to update clusters/{}/domainName/{}", new Object[]{clientAppId(), str, str2, e});
            throw new RestException(e);
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", new Object[]{clientAppId(), str, str2});
            throw new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist");
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains")
    @ApiOperation(value = "Get the cluster failure domains.", response = FailureDomain.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    public Map<String, FailureDomain> getFailureDomains(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) throws Exception {
        validateSuperUserAccess();
        HashMap newHashMap = Maps.newHashMap();
        try {
            String str2 = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
            for (String str3 : failureDomainListCache().get()) {
                try {
                    failureDomainCache().get(joinPath(str2, str3)).ifPresent(failureDomain -> {
                    });
                } catch (Exception e) {
                    log.warn("Failed to get domain {}", str3, e);
                }
            }
            return newHashMap;
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Failure-domain is not configured for cluster {}", new Object[]{clientAppId(), str, e2});
            return Collections.emptyMap();
        } catch (Exception e3) {
            log.error("[{}] Failed to get failure-domains for cluster {}", new Object[]{clientAppId(), str, e3});
            throw new RestException(e3);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @ApiOperation(value = "Get a domain in a cluster", response = FailureDomain.class, notes = "This operation requires Pulsar superuser privileges.")
    public FailureDomain getDomain(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            return (FailureDomain) failureDomainCache().get(joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, str2)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist");
            });
        } catch (Exception e) {
            log.error("[{}] Failed to get domain {} for cluster {}", new Object[]{clientAppId(), str2, str, e});
            throw new RestException(e);
        } catch (RestException e2) {
            throw e2;
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission or policy is read only"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @DELETE
    @ApiOperation(value = "Delete the failure domain of the cluster", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteFailureDomain(@PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            String joinPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, str2);
            globalZk().delete(joinPath, -1);
            failureDomainCache().invalidate(joinPath);
            failureDomainListCache().clear();
        } catch (Exception e) {
            log.error("[{}] Failed to delete domain {} in cluster {}", new Object[]{clientAppId(), str2, str, e});
            throw new RestException(e);
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Domain {} does not exist in {}", new Object[]{clientAppId(), str2, str});
            throw new RestException(Response.Status.NOT_FOUND, "Domain-name " + str2 + " or cluster " + str + " does not exist");
        }
    }

    private void validateBrokerExistsInOtherDomain(String str, String str2, FailureDomain failureDomain) {
        if (failureDomain == null || failureDomain.brokers == null) {
            return;
        }
        try {
            String str3 = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
            for (String str4 : failureDomainListCache().get()) {
                if (!str2.equals(str4)) {
                    try {
                        Optional optional = failureDomainCache().get(joinPath(str3, str4));
                        if (optional.isPresent() && ((FailureDomain) optional.get()).brokers != null) {
                            Stream stream = (Stream) ((FailureDomain) optional.get()).brokers.stream().parallel();
                            Set set = failureDomain.brokers;
                            set.getClass();
                            List list = (List) stream.filter((v1) -> {
                                return r1.contains(v1);
                            }).collect(Collectors.toList());
                            if (!list.isEmpty()) {
                                throw new RestException(Response.Status.CONFLICT, list + " already exists in " + str4);
                                break;
                            }
                        }
                    } catch (Exception e) {
                        if (e instanceof RestException) {
                            throw e;
                        }
                        log.warn("Failed to get domain {}", str4, e);
                    }
                }
            }
        } catch (KeeperException.NoNodeException e2) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Domain is not configured for cluster", clientAppId(), e2);
            }
        } catch (Exception e3) {
            log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e3);
            throw new RestException(e3);
        }
    }
}
