/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinaryProtoLookupService
implements LookupService {
    private final PulsarClientImpl client;
    protected final InetSocketAddress serviceAddress;
    private final boolean useTls;
    private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class);

    public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls) throws PulsarClientException {
        this.client = client;
        this.useTls = useTls;
        try {
            URI uri = new URI(serviceUrl);
            this.serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
        }
        catch (Exception e) {
            log.error("Invalid service-url {} provided {}", new Object[]{serviceUrl, e.getMessage(), e});
            throw new PulsarClientException.InvalidServiceURL(e);
        }
    }

    @Override
    public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) {
        return this.findBroker(this.serviceAddress, false, destination);
    }

    @Override
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DestinationName destination) {
        return this.getPartitionedTopicMetadata(this.serviceAddress, destination);
    }

    private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress, boolean authoritative, DestinationName destination) {
        CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>();
        ((CompletableFuture)this.client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newLookup(destination.toString(), authoritative, requestId);
            ((CompletableFuture)clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
                URI uri = null;
                try {
                    if (this.useTls) {
                        uri = new URI(lookupDataResult.brokerUrlTls);
                    } else {
                        String serviceUrl = lookupDataResult.brokerUrl;
                        uri = new URI(serviceUrl);
                    }
                    InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
                    if (lookupDataResult.redirect) {
                        ((CompletableFuture)this.findBroker(responseBrokerAddress, lookupDataResult.authoritative, destination).thenAccept(addressPair -> addressFuture.complete((Pair<InetSocketAddress, InetSocketAddress>)addressPair))).exceptionally(lookupException -> {
                            log.warn("[{}] lookup failed : {}", new Object[]{destination.toString(), lookupException.getMessage(), lookupException});
                            addressFuture.completeExceptionally((Throwable)lookupException);
                            return null;
                        });
                    } else if (lookupDataResult.proxyThroughServiceUrl) {
                        addressFuture.complete(Pair.of(responseBrokerAddress, this.serviceAddress));
                    } else {
                        addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
                    }
                }
                catch (Exception parseUrlException) {
                    log.warn("[{}] invalid url {} : {}", new Object[]{destination.toString(), uri, parseUrlException.getMessage(), parseUrlException});
                    addressFuture.completeExceptionally(parseUrlException);
                }
            })).exceptionally(sendException -> {
                log.warn("[{}] failed to send lookup request : {}", new Object[]{destination.toString(), sendException.getMessage(), sendException});
                addressFuture.completeExceptionally((Throwable)sendException);
                return null;
            });
        })).exceptionally(connectionException -> {
            addressFuture.completeExceptionally((Throwable)connectionException);
            return null;
        });
        return addressFuture;
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress, DestinationName destination) {
        CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<PartitionedTopicMetadata>();
        ((CompletableFuture)this.client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newPartitionMetadataRequest(destination.toString(), requestId);
            ((CompletableFuture)clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
                try {
                    partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
                }
                catch (Exception e) {
                    partitionFuture.completeExceptionally(new PulsarClientException.LookupException(String.format("Failed to parse partition-response redirect=%s , partitions with %s", lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
                }
            })).exceptionally(e -> {
                log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{destination.toString(), e.getCause().getMessage(), e});
                partitionFuture.completeExceptionally((Throwable)e);
                return null;
            });
        })).exceptionally(connectionException -> {
            partitionFuture.completeExceptionally((Throwable)connectionException);
            return null;
        });
        return partitionFuture;
    }

    @Override
    public String getServiceUrl() {
        return this.serviceAddress.toString();
    }

    @Override
    public void close() throws Exception {
    }

    public static class LookupDataResult {
        public final String brokerUrl;
        public final String brokerUrlTls;
        public final int partitions;
        public final boolean authoritative;
        public final boolean proxyThroughServiceUrl;
        public final boolean redirect;

        public LookupDataResult(PulsarApi.CommandLookupTopicResponse result) {
            this.brokerUrl = result.getBrokerServiceUrl();
            this.brokerUrlTls = result.getBrokerServiceUrlTls();
            this.authoritative = result.getAuthoritative();
            this.redirect = result.getResponse() == PulsarApi.CommandLookupTopicResponse.LookupType.Redirect;
            this.proxyThroughServiceUrl = result.getProxyThroughServiceUrl();
            this.partitions = -1;
        }

        public LookupDataResult(int partitions) {
            this.partitions = partitions;
            this.brokerUrl = null;
            this.brokerUrlTls = null;
            this.authoritative = false;
            this.proxyThroughServiceUrl = false;
            this.redirect = false;
        }
    }
}

