/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PartitionedConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarClientImpl
implements PulsarClient {
    private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
    private final ClientConfiguration conf;
    private final LookupService lookup;
    private final ConnectionPool cnxPool;
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private AtomicReference<State> state = new AtomicReference();
    private final IdentityHashMap<ProducerBase, Boolean> producers;
    private final IdentityHashMap<ConsumerBase, Boolean> consumers;
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private final EventLoopGroup eventLoopGroup;

    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
        this(serviceUrl, conf, PulsarClientImpl.getEventLoopGroup(conf));
    }

    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        if (serviceUrl == null || conf == null || eventLoopGroup == null) {
            throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
        }
        this.eventLoopGroup = eventLoopGroup;
        this.conf = conf;
        conf.getAuthentication().start();
        this.cnxPool = new ConnectionPool(this, eventLoopGroup);
        this.lookup = serviceUrl.startsWith("http") ? new HttpLookupService(serviceUrl, conf, eventLoopGroup) : new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls());
        this.timer = new HashedWheelTimer((ThreadFactory)new DefaultThreadFactory("pulsar-timer"), 1L, TimeUnit.MILLISECONDS);
        this.externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
        this.producers = Maps.newIdentityHashMap();
        this.consumers = Maps.newIdentityHashMap();
        this.state.set(State.Open);
    }

    public ClientConfiguration getConfiguration() {
        return this.conf;
    }

    @Override
    public Producer createProducer(String destination) throws PulsarClientException {
        try {
            return this.createProducerAsync(destination, new ProducerConfiguration()).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public Producer createProducer(String destination, ProducerConfiguration conf) throws PulsarClientException {
        try {
            return this.createProducerAsync(destination, conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Producer> createProducerAsync(String topic) {
        return this.createProducerAsync(topic, new ProducerConfiguration());
    }

    @Override
    public CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf) {
        return this.createProducerAsync(topic, conf, null);
    }

    public CompletableFuture<Producer> createProducerAsync(String topic, ProducerConfiguration conf, String producerName) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (!DestinationName.isValid((String)topic)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
        }
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
        }
        CompletableFuture<Producer> producerCreatedFuture = new CompletableFuture<Producer>();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            ProducerBase producer = metadata.partitions > 1 ? new PartitionedProducerImpl(this, topic, conf, metadata.partitions, producerCreatedFuture) : new ProducerImpl(this, topic, producerName, conf, producerCreatedFuture, -1);
            IdentityHashMap<ProducerBase, Boolean> identityHashMap = this.producers;
            synchronized (identityHashMap) {
                this.producers.put(producer, Boolean.TRUE);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata: {}", (Object)topic, (Object)ex.getMessage());
            producerCreatedFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return producerCreatedFuture;
    }

    @Override
    public Consumer subscribe(String topic, String subscription) throws PulsarClientException {
        return this.subscribe(topic, subscription, new ConsumerConfiguration());
    }

    @Override
    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException {
        try {
            return this.subscribeAsync(topic, subscription, conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription) {
        return this.subscribeAsync(topic, subscription, new ConsumerConfiguration());
    }

    @Override
    public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (!DestinationName.isValid((String)topic)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
        }
        if (StringUtils.isBlank((CharSequence)subscription)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
        }
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<Consumer>();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            ExecutorService listenerThread = this.externalExecutorProvider.getExecutor();
            ConsumerBase consumer = metadata.partitions > 1 ? new PartitionedConsumerImpl(this, topic, subscription, conf, metadata.partitions, listenerThread, consumerSubscribedFuture) : new ConsumerImpl(this, topic, subscription, conf, listenerThread, -1, consumerSubscribedFuture);
            IdentityHashMap<ConsumerBase, Boolean> identityHashMap = this.consumers;
            synchronized (identityHashMap) {
                this.consumers.put(consumer, Boolean.TRUE);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata", (Object)topic, ex);
            consumerSubscribedFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return consumerSubscribedFuture;
    }

    @Override
    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException {
        try {
            return this.createReaderAsync(topic, startMessageId, conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (!DestinationName.isValid((String)topic)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
        }
        if (startMessageId == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
        }
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        CompletableFuture<Reader> readerFuture = new CompletableFuture<Reader>();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            if (metadata.partitions > 1) {
                readerFuture.completeExceptionally(new PulsarClientException("Topic reader cannot be created on a partitioned topic"));
                return;
            }
            CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<Consumer>();
            ExecutorService listenerThread = this.externalExecutorProvider.getExecutor();
            ReaderImpl reader = new ReaderImpl(this, topic, startMessageId, conf, listenerThread, consumerSubscribedFuture);
            IdentityHashMap<ConsumerBase, Boolean> identityHashMap = this.consumers;
            synchronized (identityHashMap) {
                this.consumers.put(reader.getConsumer(), Boolean.TRUE);
            }
            ((CompletableFuture)consumerSubscribedFuture.thenRun(() -> readerFuture.complete(reader))).exceptionally(ex -> {
                log.warn("[{}] Failed to get create topic reader", (Object)topic, ex);
                readerFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata", (Object)topic, ex);
            readerFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return readerFuture;
    }

    @Override
    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            throw new PulsarClientException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        log.info("Client closing. URL: {}", (Object)this.lookup.getServiceUrl());
        if (!this.state.compareAndSet(State.Open, State.Closing)) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        ArrayList futures = Lists.newArrayList();
        IdentityHashMap<HandlerBase, Boolean> identityHashMap = this.producers;
        synchronized (identityHashMap) {
            ArrayList producersToClose = Lists.newArrayList(this.producers.keySet());
            producersToClose.forEach(p -> futures.add(p.closeAsync()));
        }
        identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            ArrayList consumersToClose = Lists.newArrayList(this.consumers.keySet());
            consumersToClose.forEach(c -> futures.add(c.closeAsync()));
        }
        ((CompletableFuture)FutureUtil.waitForAll(futures).thenRun(() -> {
            try {
                this.shutdown();
                closeFuture.complete(null);
                this.state.set(State.Closed);
            }
            catch (PulsarClientException e) {
                closeFuture.completeExceptionally(e);
            }
        })).exceptionally(exception -> {
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    @Override
    public void shutdown() throws PulsarClientException {
        try {
            this.lookup.close();
            this.cnxPool.close();
            this.timer.stop();
            this.externalExecutorProvider.shutdownNow();
            this.conf.getAuthentication().close();
        }
        catch (Throwable t) {
            log.warn("Failed to shutdown Pulsar client", t);
            throw new PulsarClientException(t);
        }
    }

    protected CompletableFuture<ClientCnx> getConnection(String topic) {
        DestinationName destinationName = DestinationName.get((String)topic);
        return this.lookup.getBroker(destinationName).thenCompose(pair -> this.cnxPool.getConnection((InetSocketAddress)pair.getLeft(), (InetSocketAddress)pair.getRight()));
    }

    protected Timer timer() {
        return this.timer;
    }

    ExecutorProvider externalExecutorProvider() {
        return this.externalExecutorProvider;
    }

    long newProducerId() {
        return this.producerIdGenerator.getAndIncrement();
    }

    long newConsumerId() {
        return this.consumerIdGenerator.getAndIncrement();
    }

    public long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public ConnectionPool getCnxPool() {
        return this.cnxPool;
    }

    EventLoopGroup eventLoopGroup() {
        return this.eventLoopGroup;
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture;
        try {
            DestinationName destinationName = DestinationName.get((String)topic);
            metadataFuture = this.lookup.getPartitionedTopicMetadata(destinationName);
        }
        catch (IllegalArgumentException e) {
            return FutureUtil.failedFuture(e);
        }
        return metadataFuture;
    }

    private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) {
        int numThreads = conf.getIoThreads();
        DefaultThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
        return EventLoopUtil.newEventLoopGroup((int)numThreads, (ThreadFactory)threadFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cleanupProducer(ProducerBase producer) {
        IdentityHashMap<ProducerBase, Boolean> identityHashMap = this.producers;
        synchronized (identityHashMap) {
            this.producers.remove(producer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cleanupConsumer(ConsumerBase consumer) {
        IdentityHashMap<ConsumerBase, Boolean> identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            this.consumers.remove(consumer);
        }
    }

    static enum State {
        Open,
        Closing,
        Closed;

    }
}

