/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.lookup;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicLookupBase
extends PulsarWebResource {
    private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
    private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
    private static final Logger log = LoggerFactory.getLogger(TopicLookupBase.class);

    protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse, String listenerName) {
        if (!this.pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
            log.warn("No broker was found available for topic {}", (Object)topicName);
            asyncResponse.resume((Throwable)new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
            return;
        }
        try {
            this.validateClusterOwnership(topicName.getCluster());
            this.validateAdminAndClientPermission(topicName);
            this.validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
        }
        catch (WebApplicationException we) {
            log.error("Validation check failed: {}", (Object)we.getMessage());
            this.completeLookupResponseExceptionally(asyncResponse, we);
            return;
        }
        catch (Throwable t) {
            log.error("Validation check failed: {}", (Object)t.getMessage(), (Object)t);
            this.completeLookupResponseExceptionally(asyncResponse, (Throwable)((Object)new RestException(t)));
            return;
        }
        CompletionStage existFuture = this.pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenCompose(isAllowAutoTopicCreation -> isAllowAutoTopicCreation != false || !topicName.isPersistent() && !topicName.isPartitioned() ? CompletableFuture.completedFuture(true) : this.pulsar().getNamespaceService().checkTopicExists(topicName));
        ((CompletableFuture)((CompletableFuture)existFuture).thenAccept(exist -> {
            if (!exist.booleanValue()) {
                this.completeLookupResponseExceptionally(asyncResponse, (Throwable)((Object)new RestException(Response.Status.NOT_FOUND, String.format("Topic not found %s", topicName.toString()))));
                return;
            }
            CompletableFuture<Optional<LookupResult>> lookupFuture = this.pulsar().getNamespaceService().getBrokerServiceUrlAsync(topicName, LookupOptions.builder().advertisedListenerName(listenerName).authoritative(authoritative).loadTopicsInBundle(false).build());
            ((CompletableFuture)lookupFuture.thenAccept(optionalResult -> {
                if (optionalResult == null || !optionalResult.isPresent()) {
                    log.warn("No broker was found available for topic {}", (Object)topicName);
                    this.completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
                    return;
                }
                LookupResult result = (LookupResult)optionalResult.get();
                if (result.isRedirect()) {
                    URI redirect;
                    boolean newAuthoritative = result.isAuthoritativeRedirect();
                    try {
                        String redirectUrl = this.isRequestHttps() ? result.getLookupData().getHttpUrlTls() : result.getLookupData().getHttpUrl();
                        Preconditions.checkNotNull((Object)redirectUrl, (Object)"Redirected cluster's service url is not configured");
                        String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
                        String path = String.format("%s%s%s?authoritative=%s", redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
                        path = listenerName == null ? path : path + "&listenerName=" + listenerName;
                        redirect = new URI(path);
                    }
                    catch (NullPointerException | URISyntaxException e) {
                        log.error("Error in preparing redirect url for {}: {}", new Object[]{topicName, e.getMessage(), e});
                        this.completeLookupResponseExceptionally(asyncResponse, e);
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Redirect lookup for topic {} to {}", (Object)topicName, (Object)redirect);
                    }
                    this.completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect((URI)redirect).build()));
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("Lookup succeeded for topic {} -- broker: {}", (Object)topicName, (Object)result.getLookupData());
                    }
                    this.completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
                }
            })).exceptionally(exception -> {
                log.warn("Failed to lookup broker for topic {}: {}", new Object[]{topicName, exception.getMessage(), exception});
                this.completeLookupResponseExceptionally(asyncResponse, (Throwable)exception);
                return null;
            });
        })).exceptionally(e -> {
            log.warn("Failed to check exist for topic {}: {} when lookup", new Object[]{topicName, e.getMessage(), e});
            this.completeLookupResponseExceptionally(asyncResponse, (Throwable)e);
            return null;
        });
    }

    private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception {
        try {
            this.validateTopicOperation(topic, TopicOperation.LOOKUP);
        }
        catch (Exception e) {
            throw new RestException(e);
        }
    }

    protected String internalGetNamespaceBundle(TopicName topicName) {
        this.validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.GET_BUNDLE);
        try {
            NamespaceBundle bundle = this.pulsar().getNamespaceService().getBundle(topicName);
            return bundle.getBundleRange();
        }
        catch (Exception e) {
            log.error("[{}] Failed to get namespace bundle for {}", new Object[]{this.clientAppId(), topicName, e});
            throw new RestException(e);
        }
    }

    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
        return TopicLookupBase.lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null);
    }

    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId, String advertisedListenerName) {
        CompletableFuture validationFuture = new CompletableFuture();
        CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<ByteBuf>();
        String cluster = topicName.getCluster();
        ((CompletableFuture)TopicLookupBase.getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId).thenAccept(differentClusterData -> {
            if (differentClusterData != null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", new Object[]{clientAppId, differentClusterData.getBrokerServiceUrl(), differentClusterData.getBrokerServiceUrlTls(), cluster});
                }
                validationFuture.complete(Commands.newLookupResponse((String)differentClusterData.getBrokerServiceUrl(), (String)differentClusterData.getBrokerServiceUrlTls(), (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Redirect, (long)requestId, (boolean)false));
            } else {
                ((CompletableFuture)TopicLookupBase.checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> ((CompletableFuture)TopicLookupBase.checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
                    if (peerClusterData == null) {
                        validationFuture.complete(null);
                        return;
                    }
                    if (StringUtils.isBlank((CharSequence)peerClusterData.getBrokerServiceUrl()) && StringUtils.isBlank((CharSequence)peerClusterData.getBrokerServiceUrlTls())) {
                        validationFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)"Redirected cluster's brokerService url is not configured", (long)requestId));
                        return;
                    }
                    validationFuture.complete(Commands.newLookupResponse((String)peerClusterData.getBrokerServiceUrl(), (String)peerClusterData.getBrokerServiceUrlTls(), (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Redirect, (long)requestId, (boolean)false));
                })).exceptionally(ex -> {
                    validationFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)FutureUtil.unwrapCompletionException((Throwable)ex).getMessage(), (long)requestId));
                    return null;
                }))).exceptionally(e -> {
                    Throwable throwable = FutureUtil.unwrapCompletionException((Throwable)e);
                    if (throwable instanceof RestException) {
                        log.warn("Failed to authorized {} on cluster {}", (Object)clientAppId, (Object)topicName);
                        validationFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.AuthorizationError, (String)throwable.getMessage(), (long)requestId));
                    } else {
                        log.warn("Unknown error while authorizing {} on cluster {}", (Object)clientAppId, (Object)topicName);
                        validationFuture.completeExceptionally(throwable);
                    }
                    return null;
                });
            }
        })).exceptionally(ex -> {
            validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException((Throwable)ex));
            return null;
        });
        ((CompletableFuture)validationFuture.thenAccept(validationFailureResponse -> {
            if (validationFailureResponse != null) {
                lookupfuture.complete((ByteBuf)validationFailureResponse);
            } else {
                LookupOptions options = LookupOptions.builder().authoritative(authoritative).advertisedListenerName(advertisedListenerName).loadTopicsInBundle(true).build();
                ((CompletableFuture)pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options).thenAccept(lookupResult -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Lookup result {}", (Object)topicName.toString(), lookupResult);
                    }
                    if (!lookupResult.isPresent()) {
                        lookupfuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)("No broker was available to own " + topicName), (long)requestId));
                        return;
                    }
                    LookupData lookupData = ((LookupResult)lookupResult.get()).getLookupData();
                    if (((LookupResult)lookupResult.get()).isRedirect()) {
                        boolean newAuthoritative = ((LookupResult)lookupResult.get()).isAuthoritativeRedirect();
                        lookupfuture.complete(Commands.newLookupResponse((String)lookupData.getBrokerUrl(), (String)lookupData.getBrokerUrlTls(), (boolean)newAuthoritative, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Redirect, (long)requestId, (boolean)false));
                    } else {
                        ServiceConfiguration conf = pulsarService.getConfiguration();
                        lookupfuture.complete(Commands.newLookupResponse((String)lookupData.getBrokerUrl(), (String)lookupData.getBrokerUrlTls(), (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Connect, (long)requestId, (boolean)TopicLookupBase.shouldRedirectThroughServiceUrl(conf, lookupData)));
                    }
                })).exceptionally(ex -> {
                    TopicLookupBase.handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            TopicLookupBase.handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
            return null;
        });
        return lookupfuture;
    }

    private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
        this.pulsar().getBrokerService().getLookupRequestSemaphore().release();
        asyncResponse.resume(t);
    }

    private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
        this.pulsar().getBrokerService().getLookupRequestSemaphore().release();
        asyncResponse.resume((Object)lookupData);
    }

    private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId, long requestId, Throwable ex) {
        Throwable unwrapEx = FutureUtil.unwrapCompletionException((Throwable)ex);
        String errorMsg = unwrapEx.getMessage();
        if (unwrapEx instanceof IllegalStateException) {
            log.info("Failed to lookup {} for topic {} with error {}", new Object[]{clientAppId, topicName, errorMsg});
            lookupFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)errorMsg, (long)requestId));
        } else if (unwrapEx instanceof MetadataStoreException) {
            log.warn("Failed to lookup {} for topic {} with error {}", new Object[]{clientAppId, topicName, errorMsg});
            lookupFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)errorMsg, (long)requestId));
        } else {
            log.warn("Failed to lookup {} for topic {} with error {}", new Object[]{clientAppId, topicName, errorMsg});
            lookupFuture.complete(Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)errorMsg, (long)requestId));
        }
    }

    protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace, @Encoded String encodedTopic) {
        String decodedName = Codec.decode((String)encodedTopic);
        return TopicName.get((String)TopicDomain.getEnum((String)topicDomain).value(), (String)tenant, (String)cluster, (String)namespace, (String)decodedName);
    }

    protected TopicName getTopicName(String topicDomain, String tenant, String namespace, @Encoded String encodedTopic) {
        String decodedName = Codec.decode((String)encodedTopic);
        return TopicName.get((String)TopicDomain.getEnum((String)topicDomain).value(), (String)tenant, (String)namespace, (String)decodedName);
    }

    private static boolean shouldRedirectThroughServiceUrl(ServiceConfiguration conf, LookupData lookupData) {
        if (!conf.isRunningStandalone()) {
            return false;
        }
        if (!StringUtils.isEmpty((CharSequence)lookupData.getBrokerUrl())) {
            try {
                URI host = URI.create(lookupData.getBrokerUrl());
                return InetAddress.getByName(host.getHost()).isLoopbackAddress();
            }
            catch (Exception e) {
                log.info("Failed to resolve advertised address {}: {}", (Object)lookupData.getBrokerUrl(), (Object)e.getMessage());
                return false;
            }
        }
        if (!StringUtils.isEmpty((CharSequence)lookupData.getBrokerUrlTls())) {
            try {
                URI host = URI.create(lookupData.getBrokerUrlTls());
                return InetAddress.getByName(host.getHost()).isLoopbackAddress();
            }
            catch (Exception e) {
                log.info("Failed to resolve advertised address {}: {}", (Object)lookupData.getBrokerUrlTls(), (Object)e.getMessage());
                return false;
            }
        }
        return false;
    }
}

