/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupProxyHandler {
    private final ProxyService service;
    private final ProxyConnection proxyConnection;
    private final boolean connectWithTLS;
    private SocketAddress clientAddress;
    private static final Counter lookupRequests = (Counter)Counter.build((String)"pulsar_proxy_lookup_requests", (String)"Counter of topic lookup requests").create().register();
    private static final Counter partitionsMetadataRequests = (Counter)Counter.build((String)"pulsar_proxy_partitions_metadata_requests", (String)"Counter of partitions metadata requests").create().register();
    private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);

    public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
        this.service = proxy;
        this.proxyConnection = proxyConnection;
        this.clientAddress = proxyConnection.clientAddress();
        this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
    }

    public void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        if (log.isDebugEnabled()) {
            log.debug("Received Lookup from {}", (Object)this.clientAddress);
        }
        lookupRequests.inc();
        long clientRequestId = lookup.getRequestId();
        String topic = lookup.getTopic();
        ServiceLookupData availableBroker = null;
        try {
            availableBroker = this.service.getDiscoveryProvider().nextBroker();
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)e.getMessage(), (long)clientRequestId));
            return;
        }
        this.performLookup(clientRequestId, topic, availableBroker.getPulsarServiceUrl(), false, 10);
    }

    private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative, int numberOfRetries) {
        URI brokerURI;
        if (numberOfRetries == 0) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)"Reached max number of redirections", (long)clientRequestId));
            return;
        }
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
            return;
        }
        InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}'", (Object)addr);
        }
        ((CompletableFuture)this.service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.service.newRequestId();
            ((CompletableFuture)clientCnx.newLookup(Commands.newLookup((String)topic, (boolean)authoritative, (long)requestId), requestId).thenAccept(result -> {
                if (result.redirect) {
                    this.performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
                } else {
                    String brokerUrl = this.connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupResponse((String)brokerUrl, (String)brokerUrl, (boolean)true, (PulsarApi.CommandLookupTopicResponse.LookupType)PulsarApi.CommandLookupTopicResponse.LookupType.Connect, (long)clientRequestId, (boolean)true));
                }
            })).exceptionally(ex -> {
                log.warn("[{}] Failed to lookup topic {}: {}", new Object[]{this.clientAddress, topic, ex.getMessage()});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        })).exceptionally(ex -> {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
            return null;
        });
    }

    void handlePartitionMetadataResponse(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        partitionsMetadataRequests.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup", (Object)this.clientAddress);
        }
        long requestId = partitionMetadata.getRequestId();
        DestinationName dn = DestinationName.get((String)partitionMetadata.getTopic());
        ((CompletableFuture)this.service.getDiscoveryProvider().getPartitionedTopicMetadata(this.service, dn, this.proxyConnection.clientAuthRole).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{this.proxyConnection.clientAuthRole, dn, metadata.partitions});
            }
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)metadata.partitions, (long)requestId));
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned metadata for topic {} {}", new Object[]{this.clientAddress, dn, ex.getMessage(), ex});
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
            return null;
        });
    }
}

