package org.apache.pulsar.broker.lookup;

import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/lookup/TopicLookupBase.class */
public class TopicLookupBase extends PulsarWebResource {
    private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
    private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
    private static final Logger log = LoggerFactory.getLogger(TopicLookupBase.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean z, String str) {
        if (pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
            return validateClusterOwnershipAsync(topicName.getCluster()).thenCompose(r5 -> {
                return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
                return validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null);
            }).thenCompose(r6 -> {
                return (topicName.isPersistent() || topicName.isPartitioned()) ? pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(bool -> {
                    return bool.booleanValue() ? CompletableFuture.completedFuture(true) : pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName);
                }) : CompletableFuture.completedFuture(true);
            }).thenCompose(bool -> {
                if (bool.booleanValue()) {
                    return pulsar().getNamespaceService().getBrokerServiceUrlAsync(topicName, LookupOptions.builder().advertisedListenerName(str).authoritative(z).loadTopicsInBundle(false).build()).thenApply(optional -> {
                        if (optional == null || !optional.isPresent()) {
                            log.warn("No broker was found available for topic {}", topicName);
                            throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
                        }
                        LookupResult lookupResult = (LookupResult) optional.get();
                        if (!lookupResult.isRedirect()) {
                            if (log.isDebugEnabled()) {
                                log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, lookupResult.getLookupData());
                            }
                            pulsar().getBrokerService().getLookupRequestSemaphore().release();
                            return lookupResult.getLookupData();
                        }
                        boolean isAuthoritativeRedirect = lookupResult.isAuthoritativeRedirect();
                        try {
                            String httpUrlTls = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() : lookupResult.getLookupData().getHttpUrl();
                            if (httpUrlTls == null) {
                                log.error("Redirected cluster's service url is not configured");
                                throw new RestException(Response.Status.PRECONDITION_FAILED, "Redirected cluster's service url is not configured.");
                            }
                            String format = String.format("%s%s%s?authoritative=%s", httpUrlTls, topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1, topicName.getLookupName(), Boolean.valueOf(isAuthoritativeRedirect));
                            URI uri = new URI(str == null ? format : format + "&listenerName=" + str);
                            if (log.isDebugEnabled()) {
                                log.debug("Redirect lookup for topic {} to {}", topicName, uri);
                            }
                            throw new WebApplicationException(Response.temporaryRedirect(uri).build());
                        } catch (URISyntaxException e) {
                            log.error("Error in preparing redirect url for {}: {}", new Object[]{topicName, e.getMessage(), e});
                            throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
                        }
                    }).exceptionally((Function<Throwable, ? extends U>) th -> {
                        pulsar().getBrokerService().getLookupRequestSemaphore().release();
                        throw FutureUtil.wrapToCompletionException(th);
                    });
                }
                throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
            });
        }
        log.warn("No broker was found available for topic {}", topicName);
        return FutureUtil.failedFuture(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String internalGetNamespaceBundle(TopicName topicName) {
        validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.GET_BUNDLE);
        try {
            return pulsar().getNamespaceService().getBundle(topicName).getBundleRange();
        } catch (Exception e) {
            log.error("[{}] Failed to get namespace bundle for {}", new Object[]{clientAppId(), topicName, e});
            throw new RestException(e);
        }
    }

    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean z, String str, AuthenticationDataSource authenticationDataSource, long j) {
        return lookupTopicAsync(pulsarService, topicName, z, str, authenticationDataSource, j, null);
    }

    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean z, String str, AuthenticationDataSource authenticationDataSource, long j, String str2) {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture<ByteBuf> completableFuture2 = new CompletableFuture<>();
        String cluster = topicName.getCluster();
        getClusterDataIfDifferentCluster(pulsarService, cluster, str).thenAccept(clusterData -> {
            if (clusterData == null) {
                checkAuthorizationAsync(pulsarService, topicName, str, authenticationDataSource).thenRun(() -> {
                    checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName)).thenAccept(clusterDataImpl -> {
                        if (clusterDataImpl == null) {
                            completableFuture.complete(null);
                        } else if (StringUtils.isBlank(clusterDataImpl.getBrokerServiceUrl()) && StringUtils.isBlank(clusterDataImpl.getBrokerServiceUrlTls())) {
                            completableFuture.complete(Commands.newLookupErrorResponse(ServerError.MetadataError, "Redirected cluster's brokerService url is not configured", j));
                        } else {
                            completableFuture.complete(Commands.newLookupResponse(clusterDataImpl.getBrokerServiceUrl(), clusterDataImpl.getBrokerServiceUrlTls(), true, CommandLookupTopicResponse.LookupType.Redirect, j, false));
                        }
                    }).exceptionally(th -> {
                        Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                        if ((unwrapCompletionException instanceof RestException) && ((RestException) unwrapCompletionException).getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
                            completableFuture.complete(Commands.newLookupErrorResponse(ServerError.TopicNotFound, unwrapCompletionException.getMessage(), j));
                            return null;
                        }
                        completableFuture.complete(Commands.newLookupErrorResponse(ServerError.MetadataError, unwrapCompletionException.getMessage(), j));
                        return null;
                    });
                }).exceptionally(th -> {
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    if (unwrapCompletionException instanceof RestException) {
                        log.warn("Failed to authorized {} on cluster {}", str, topicName);
                        completableFuture.complete(Commands.newLookupErrorResponse(ServerError.AuthorizationError, unwrapCompletionException.getMessage(), j));
                        return null;
                    }
                    log.warn("Unknown error while authorizing {} on cluster {}", str, topicName);
                    completableFuture.completeExceptionally(unwrapCompletionException);
                    return null;
                });
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", new Object[]{str, clusterData.getBrokerServiceUrl(), clusterData.getBrokerServiceUrlTls(), cluster});
            }
            completableFuture.complete(Commands.newLookupResponse(clusterData.getBrokerServiceUrl(), clusterData.getBrokerServiceUrlTls(), true, CommandLookupTopicResponse.LookupType.Redirect, j, false));
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
            return null;
        });
        completableFuture.thenAccept(byteBuf -> {
            if (byteBuf != null) {
                completableFuture2.complete(byteBuf);
            } else {
                pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, LookupOptions.builder().authoritative(z).advertisedListenerName(str2).loadTopicsInBundle(true).build()).thenAccept(optional -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Lookup result {}", topicName.toString(), optional);
                    }
                    if (!optional.isPresent()) {
                        completableFuture2.complete(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, "No broker was available to own " + topicName, j));
                        return;
                    }
                    LookupData lookupData = ((LookupResult) optional.get()).getLookupData();
                    if (((LookupResult) optional.get()).isRedirect()) {
                        completableFuture2.complete(Commands.newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(), ((LookupResult) optional.get()).isAuthoritativeRedirect(), CommandLookupTopicResponse.LookupType.Redirect, j, false));
                    } else {
                        completableFuture2.complete(Commands.newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(), true, CommandLookupTopicResponse.LookupType.Connect, j, shouldRedirectThroughServiceUrl(pulsarService.getConfiguration(), lookupData)));
                    }
                }).exceptionally(th2 -> {
                    if ((th2 instanceof CompletionException) && (th2.getCause() instanceof IllegalStateException)) {
                        log.info("Failed to lookup {} for topic {} with error {}", new Object[]{str, topicName.toString(), th2.getCause().getMessage()});
                    } else {
                        log.warn("Failed to lookup {} for topic {} with error {}", new Object[]{str, topicName.toString(), th2.getMessage(), th2});
                    }
                    completableFuture2.complete(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, th2.getMessage(), j));
                    return null;
                });
            }
        }).exceptionally(th2 -> {
            if ((th2 instanceof CompletionException) && (th2.getCause() instanceof IllegalStateException)) {
                log.info("Failed to lookup {} for topic {} with error {}", new Object[]{str, topicName.toString(), th2.getCause().getMessage()});
            } else {
                log.warn("Failed to lookup {} for topic {} with error {}", new Object[]{str, topicName.toString(), th2.getMessage(), th2});
            }
            completableFuture2.complete(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, th2.getMessage(), j));
            return null;
        });
        return completableFuture2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicName getTopicName(String str, String str2, String str3, String str4, @Encoded String str5) {
        return TopicName.get(TopicDomain.getEnum(str).value(), str2, str3, str4, Codec.decode(str5));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicName getTopicName(String str, String str2, String str3, @Encoded String str4) {
        return TopicName.get(TopicDomain.getEnum(str).value(), str2, str3, Codec.decode(str4));
    }

    private static boolean shouldRedirectThroughServiceUrl(ServiceConfiguration serviceConfiguration, LookupData lookupData) {
        if (!serviceConfiguration.isRunningStandalone()) {
            return false;
        }
        if (!StringUtils.isEmpty(lookupData.getBrokerUrl())) {
            try {
                return InetAddress.getByName(URI.create(lookupData.getBrokerUrl()).getHost()).isLoopbackAddress();
            } catch (Exception e) {
                log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrl(), e.getMessage());
                return false;
            }
        }
        if (StringUtils.isEmpty(lookupData.getBrokerUrlTls())) {
            return false;
        }
        try {
            return InetAddress.getByName(URI.create(lookupData.getBrokerUrlTls()).getHost()).isLoopbackAddress();
        } catch (Exception e2) {
            log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrlTls(), e2.getMessage());
            return false;
        }
    }
}
