/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerStats;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl
extends ProducerBase {
    private List<ProducerImpl> producers;
    private int numPartitions;
    private MessageRouter routerPolicy;
    private final ProducerStats stats;
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);

    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions, CompletableFuture<Producer> producerCreatedFuture) {
        super(client, topic, conf, producerCreatedFuture);
        this.producers = Lists.newArrayListWithCapacity((int)numPartitions);
        this.numPartitions = numPartitions;
        this.routerPolicy = conf.getMessageRouter(numPartitions);
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStats() : null;
        this.start();
    }

    private void start() {
        AtomicReference createFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger();
        for (int partitionIndex = 0; partitionIndex < this.numPartitions; ++partitionIndex) {
            String partitionName = DestinationName.get((String)this.topic).getPartition(partitionIndex).toString();
            ProducerImpl producer = new ProducerImpl(this.client, partitionName, null, this.conf, new CompletableFuture<Producer>(), partitionIndex);
            this.producers.add(producer);
            producer.producerCreatedFuture().handle((prod, createException) -> {
                if (createException != null) {
                    this.setState(HandlerBase.State.Failed);
                    createFail.compareAndSet(null, createException);
                }
                if (completed.incrementAndGet() == this.numPartitions) {
                    if (createFail.get() == null) {
                        this.setState(HandlerBase.State.Ready);
                        this.producerCreatedFuture().complete(this);
                        log.info("[{}] Created partitioned producer", (Object)this.topic);
                    } else {
                        this.closeAsync().handle((ok, closeException) -> {
                            this.producerCreatedFuture().completeExceptionally((Throwable)createFail.get());
                            this.client.cleanupProducer(this);
                            return null;
                        });
                        log.error("[{}] Could not create partitioned producer.", (Object)this.topic, (Object)((Throwable)createFail.get()).getCause());
                    }
                }
                return null;
            });
        }
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(Message message) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                break;
            }
            case Closing: 
            case Closed: {
                return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Producer already closed"));
            }
            case Failed: 
            case Uninitialized: {
                return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
            }
        }
        int partition = this.routerPolicy.choosePartition(message);
        Preconditions.checkArgument((partition >= 0 && partition < this.numPartitions ? 1 : 0) != 0, (Object)"Illegal partition index chosen by the message routing policy");
        return this.producers.get(partition).sendAsync(message);
    }

    @Override
    public boolean isConnected() {
        for (ProducerImpl producer : this.producers) {
            if (producer.isConnected()) continue;
            return false;
        }
        return true;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerBase.State.Closing);
        AtomicReference closeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.numPartitions);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        for (Producer producer : this.producers) {
            if (producer == null) continue;
            producer.closeAsync().handle((closed, ex) -> {
                if (ex != null) {
                    closeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (closeFail.get() == null) {
                        this.setState(HandlerBase.State.Closed);
                        closeFuture.complete(null);
                        log.info("[{}] Closed Partitioned Producer", (Object)this.topic);
                        this.client.cleanupProducer(this);
                    } else {
                        this.setState(HandlerBase.State.Failed);
                        closeFuture.completeExceptionally((Throwable)closeFail.get());
                        log.error("[{}] Could not close Partitioned Producer", (Object)this.topic, (Object)((Throwable)closeFail.get()).getCause());
                    }
                }
                return null;
            });
        }
        return closeFuture;
    }

    @Override
    public synchronized ProducerStats getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        for (int i = 0; i < this.numPartitions; ++i) {
            this.stats.updateCumulativeStats(this.producers.get(i).getStats());
        }
        return this.stats;
    }

    @Override
    void connectionFailed(PulsarClientException exception) {
    }

    @Override
    void connectionOpened(ClientCnx cnx) {
    }

    @Override
    String getHandlerName() {
        return "partition-producer";
    }
}

