package org.apache.pulsar.broker.admin.impl;

import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/BrokersBase.class */
public class BrokersBase extends AdminResource {
    public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
    private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000;
    private volatile long threadDumpLoggedTimestamp;
    private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
    private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
    private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve this cluster"), @ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}")})
    @Path("/{cluster}")
    @ApiOperation(value = "Get the list of active brokers (broker ids) in the cluster.If authorization is not enabled, any cluster name is valid.", response = String.class, responseContainer = "Set")
    public void getActiveBrokers(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") String str) {
        validateSuperUserAccessAsync().thenCompose(r5 -> {
            return validateClusterOwnershipAsync(str);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return pulsar().getLoadManager().get().getAvailableBrokersAsync();
        }).thenAccept(set -> {
            LOG.info("[{}] Successfully to get active brokers, cluster={}", clientAppId(), str);
            asyncResponse.resume(set);
        }).exceptionally(th -> {
            if (!isRedirectException(th)) {
                LOG.error("[{}] Fail to get active brokers, cluster={}", new Object[]{clientAppId(), str, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access")})
    @ApiOperation(value = "Get the list of active brokers (broker ids) in the local cluster.If authorization is not enabled", response = String.class, responseContainer = "Set")
    public void getActiveBrokers(@Suspended AsyncResponse asyncResponse) throws Exception {
        getActiveBrokers(asyncResponse, null);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Leader broker not found")})
    @Path("/leaderBroker")
    @ApiOperation(value = "Get the information of the leader broker.", response = BrokerInfo.class)
    public void getLeaderBroker(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenAccept(r6 -> {
            LeaderBroker orElseThrow = pulsar().getLeaderElectionService().getCurrentLeader().orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Couldn't find leader broker");
            });
            BrokerInfo build = BrokerInfo.builder().serviceUrl(orElseThrow.getServiceUrl()).brokerId(orElseThrow.getBrokerId()).build();
            LOG.info("[{}] Successfully to get the information of the leader broker.", clientAppId());
            asyncResponse.resume(build);
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to get the information of the leader broker.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist")})
    @Path("/{clusterName}/{brokerId}/ownedNamespaces")
    @ApiOperation(value = "Get the list of namespaces served by the specific broker id", response = NamespaceOwnershipStatus.class, responseContainer = "Map")
    public void getOwnedNamespaces(@Suspended AsyncResponse asyncResponse, @PathParam("clusterName") String str, @PathParam("brokerId") String str2) {
        CompletableFuture thenCompose = validateSuperUserAccessAsync().thenCompose(r5 -> {
            return maybeRedirectToBroker(str2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
            return validateClusterOwnershipAsync(str);
        }).thenCompose(r3 -> {
            return pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync();
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            if (!isRedirectException(th)) {
                LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", new Object[]{clientAppId(), str, str2});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Service configuration updated successfully"), @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 412, message = "Invalid dynamic-config value"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/configuration/{configName}/{configValue}")
    @ApiOperation("Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @POST
    public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam("configName") String str, @PathParam("configValue") String str2) {
        validateSuperUserAccessAsync().thenCompose(r7 -> {
            return persistDynamicConfigurationAsync(str, str2);
        }).thenAccept((Consumer<? super U>) r11 -> {
            LOG.info("[{}] Updated Service configuration {}/{}", new Object[]{clientAppId(), str, str2});
            asyncResponse.resume(Response.ok().build());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to update configuration {}/{}", new Object[]{clientAppId(), str, str2, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Service configuration updated successfully"), @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 412, message = "Invalid dynamic-config value"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/configuration/{configName}")
    @DELETE
    @ApiOperation("Delete dynamic ServiceConfiguration into metadata only. This operation requires Pulsar super-user privileges.")
    public void deleteDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam("configName") String str) {
        validateSuperUserAccessAsync().thenCompose(r5 -> {
            return internalDeleteDynamicConfigurationOnMetadataAsync(str);
        }).thenAccept((Consumer<? super U>) r8 -> {
            LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), str);
            asyncResponse.resume(Response.ok().build());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to delete dynamic configuration {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "You don't have admin permission to view configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/configuration/values")
    @ApiOperation("Get value of all dynamic configurations' value overridden on local config")
    public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenCompose(r3 -> {
            return dynamicConfigurationResources().getDynamicConfigurationAsync();
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.orElseGet(Collections::emptyMap));
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to get all dynamic configuration.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
    @Path("/configuration")
    @ApiOperation("Get all updatable dynamic configurations's name")
    public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenAccept(r5 -> {
            asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    @Path("/configuration/runtime")
    @ApiOperation("Get all runtime configurations. This operation requires Pulsar super-user privileges.")
    public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenAccept(r5 -> {
            asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(String str, String str2) {
        return !pulsar().getBrokerService().validateDynamicConfiguration(str, str2) ? FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, " Invalid dynamic-config value")) : pulsar().getBrokerService().isDynamicConfiguration(str) ? dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(optional -> {
            Map map = (Map) optional.orElseGet(Maps::newHashMap);
            map.put(str, str2);
            return map;
        }) : FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Can't update non-dynamic configuration"));
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    @Path("/internal-configuration")
    @ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
    public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenAccept(r5 -> {
            asyncResponse.resume(pulsar().getInternalConfigurationData());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Everything is OK"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/backlog-quota-check")
    @ApiOperation("An REST endpoint to trigger backlogQuotaCheck")
    public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccessAsync().thenAcceptAsync(r5 -> {
            pulsar().getBrokerService().monitorBacklogQuota();
            asyncResponse.resume(Response.noContent().build());
        }, (Executor) pulsar().getBrokerService().getBacklogQuotaChecker()).exceptionally(th -> {
            LOG.error("[{}] Failed to trigger backlog quota check.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Broker is ready"), @ApiResponse(code = 500, message = "Broker is not ready")})
    @Path("/ready")
    @ApiOperation("Check if the broker is fully initialized")
    public void isReady(@Suspended AsyncResponse asyncResponse) {
        if (pulsar().getState() == PulsarService.State.Started) {
            asyncResponse.resume(Response.ok("ok").build());
        } else {
            asyncResponse.resume(Response.serverError().build());
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Everything is OK"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/health")
    @ApiOperation("Run a healthCheck against the broker")
    @ApiParam("Topic Version")
    public void healthCheck(@Suspended AsyncResponse asyncResponse, @QueryParam("topicVersion") TopicVersion topicVersion) {
        validateSuperUserAccessAsync().thenAccept(r3 -> {
            checkDeadlockedThreads();
        }).thenCompose(r5 -> {
            return internalRunHealthCheck(topicVersion);
        }).thenAccept((Consumer<? super U>) r6 -> {
            LOG.info("[{}] Successfully run health check.", clientAppId());
            asyncResponse.resume("ok");
        }).exceptionally(th -> {
            LOG.error("[{}] Fail to run health check.", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private void checkDeadlockedThreads() {
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long[] findDeadlockedThreads = threadMXBean.findDeadlockedThreads();
        if (findDeadlockedThreads == null || findDeadlockedThreads.length <= 0) {
            return;
        }
        String str = (String) Arrays.stream(threadMXBean.getThreadInfo(findDeadlockedThreads, false, false)).map(threadInfo -> {
            return threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")";
        }).collect(Collectors.joining(", "));
        if (System.currentTimeMillis() - this.threadDumpLoggedTimestamp > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
            this.threadDumpLoggedTimestamp = System.currentTimeMillis();
            LOG.error("Deadlocked threads detected. {}\n{}", str, ThreadDumpUtil.buildThreadDiagnosticString());
        } else {
            LOG.error("Deadlocked threads detected. {}", str);
        }
        throw new IllegalStateException("Deadlocked threads detected. " + str);
    }

    private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
        String brokerId = pulsar().getBrokerId();
        String format = String.format("persistent://%s/%s", topicVersion == TopicVersion.V2 ? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration()) : NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration()), HEALTH_CHECK_TOPIC_SUFFIX);
        LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), format);
        String uuid = UUID.randomUUID().toString();
        String str = "healthCheck-" + uuid;
        return pulsar().getBrokerService().getTopic(format, true).thenCompose(optional -> {
            if (!optional.isPresent()) {
                LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", clientAppId(), format);
                throw new RestException(Response.Status.NOT_FOUND, String.format("Topic [%s] not found after create.", format));
            }
            try {
                PulsarClient client = pulsar().getClient();
                CompletableFuture completableFuture = new CompletableFuture();
                client.newProducer(Schema.STRING).topic(format).createAsync().thenCompose(producer -> {
                    return client.newReader(Schema.STRING).topic(format).subscriptionName(str).startMessageId(MessageId.latest).createAsync().exceptionally(th -> {
                        producer.closeAsync().exceptionally(th -> {
                            LOG.error("[{}] Close producer fail while heath check.", clientAppId());
                            return null;
                        });
                        throw FutureUtil.wrapToCompletionException(th);
                    }).thenCompose(reader -> {
                        return producer.sendAsync(uuid).thenCompose(messageId -> {
                            return FutureUtil.addTimeoutHandling(healthCheckRecursiveReadNext(reader, uuid), HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), () -> {
                                return HEALTH_CHECK_TIMEOUT_EXCEPTION;
                            });
                        }).whenComplete((r12, th2) -> {
                            closeAndReCheck(producer, reader, (Topic) optional.get(), str).whenComplete((r5, th2) -> {
                                if (th2 != null) {
                                    completableFuture.completeExceptionally(th2);
                                } else {
                                    completableFuture.complete(null);
                                }
                            });
                        });
                    });
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return completableFuture;
            } catch (PulsarServerException e) {
                LOG.error("[{}] Fail to run health check while get client.", clientAppId());
                throw new RestException((Throwable) e);
            }
        });
    }

    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, Topic topic, String str) {
        CompletableFuture closeAsync = producer.closeAsync();
        CompletableFuture closeAsync2 = reader.closeAsync();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(closeAsync);
        arrayList.add(closeAsync2);
        return FutureUtil.waitForAll(Collections.unmodifiableList(arrayList)).exceptionally(th -> {
            if (!closeAsync2.isCompletedExceptionally()) {
                LOG.error("[{}] Close producer fail while heath check.", clientAppId());
                return null;
            }
            LOG.error("[{}] Close reader fail while heath check.", clientAppId());
            Subscription subscription = topic.getSubscription(str);
            if (subscription == null) {
                return null;
            }
            LOG.warn("[{}] Force delete subscription {} when it still exists after the reader is closed.", clientAppId(), subscription);
            subscription.deleteForcefully().exceptionally(th -> {
                LOG.error("[{}] Force delete subscription fail while health check", clientAppId(), th);
                return null;
            });
            return null;
        });
    }

    private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String str) {
        return reader.readNextAsync().thenCompose(message -> {
            return !Objects.equals(str, message.getValue()) ? healthCheckRecursiveReadNext(reader, str) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String str) {
        if (pulsar().getBrokerService().isDynamicConfiguration(str)) {
            return dynamicConfigurationResources().setDynamicConfigurationAsync(map -> {
                if (map != null) {
                    map.remove(str);
                }
                return map;
            });
        }
        throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Everything is OK"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/version")
    @ApiOperation("Get version of current broker")
    public String version() throws Exception {
        return PulsarVersion.getVersion();
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Execute shutdown command successfully"), @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/shutdown")
    @ApiOperation("Shutdown broker gracefully.")
    @POST
    public void shutDownBrokerGracefully(@QueryParam("maxConcurrentUnloadPerSec") @ApiParam(name = "maxConcurrentUnloadPerSec", value = "if the value absent(value=0) means no concurrent limitation.") int i, @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean z, @Suspended AsyncResponse asyncResponse) {
        validateSuperUserAccess();
        doShutDownBrokerGracefullyAsync(i, z).thenAccept(r6 -> {
            LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId());
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            LOG.error("[{}] Failed to shutdown broker gracefully", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int i, boolean z) {
        pulsar().getBrokerService().unloadNamespaceBundlesGracefully(i, z);
        return pulsar().closeAsync();
    }
}
