/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerStats;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedConsumerImpl
extends ConsumerBase {
    private final List<ConsumerImpl> consumers;
    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
    private final int sharedQueueResumeThreshold;
    private final int numPartitions;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConsumerStats stats;
    private final UnAckedMessageTracker unAckedMessageTracker;
    private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class);

    PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
        super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture);
        this.consumers = Lists.newArrayListWithCapacity(numPartitions);
        this.pausedConsumers = new ConcurrentLinkedQueue();
        this.sharedQueueResumeThreshold = this.maxReceiverQueueSize / 2;
        this.numPartitions = numPartitions;
        this.unAckedMessageTracker = conf.getAckTimeoutMillis() != 0L ? new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()) : UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ConsumerStats() : null;
        Preconditions.checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Partitioned Topics");
        this.start();
    }

    private void start() {
        AtomicReference subscribeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger();
        ConsumerConfiguration internalConfig = this.getInternalConsumerConfig();
        for (int partitionIndex = 0; partitionIndex < this.numPartitions; ++partitionIndex) {
            String partitionName = DestinationName.get(this.topic).getPartition(partitionIndex).toString();
            ConsumerImpl consumer = new ConsumerImpl(this.client, partitionName, this.subscription, internalConfig, this.client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<Consumer>());
            this.consumers.add(consumer);
            consumer.subscribeFuture().handle((cons, subscribeException) -> {
                if (subscribeException != null) {
                    this.setState(HandlerBase.State.Failed);
                    subscribeFail.compareAndSet(null, subscribeException);
                    this.client.cleanupConsumer(this);
                }
                if (completed.incrementAndGet() == this.numPartitions) {
                    if (subscribeFail.get() == null) {
                        try {
                            this.starReceivingMessages();
                            this.setState(HandlerBase.State.Ready);
                            this.subscribeFuture().complete(this);
                            log.info("[{}] [{}] Created partitioned consumer", (Object)this.topic, (Object)this.subscription);
                            return null;
                        }
                        catch (PulsarClientException e) {
                            subscribeFail.set(e);
                        }
                    }
                    this.closeAsync().handle((ok, closeException) -> {
                        this.subscribeFuture().completeExceptionally((Throwable)subscribeFail.get());
                        this.client.cleanupConsumer(this);
                        return null;
                    });
                    log.error("[{}] [{}] Could not create partitioned consumer.", new Object[]{this.topic, this.subscription, ((Throwable)subscribeFail.get()).getCause()});
                }
                return null;
            });
        }
    }

    private void starReceivingMessages() throws PulsarClientException {
        for (ConsumerImpl consumer : this.consumers) {
            consumer.sendFlowPermitsToBroker(consumer.cnx(), this.conf.getReceiverQueueSize());
            this.receiveMessageFromConsumer(consumer);
        }
    }

    private void receiveMessageFromConsumer(ConsumerImpl consumer) {
        consumer.receiveAsync().thenAccept(message -> {
            this.messageReceived((Message)message);
            this.lock.writeLock().lock();
            try {
                int size = this.incomingMessages.size();
                if (size >= this.maxReceiverQueueSize || size > this.sharedQueueResumeThreshold && !this.pausedConsumers.isEmpty()) {
                    this.pausedConsumers.add(consumer);
                } else {
                    this.client.eventLoopGroup().execute(() -> this.receiveMessageFromConsumer(consumer));
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
        });
    }

    private void resumeReceivingFromPausedConsumersIfNeeded() {
        block5: {
            this.lock.readLock().lock();
            try {
                ConsumerImpl consumer;
                if (this.incomingMessages.size() > this.sharedQueueResumeThreshold || this.pausedConsumers.isEmpty()) break block5;
                while ((consumer = this.pausedConsumers.poll()) != null) {
                    this.client.eventLoopGroup().execute(() -> this.receiveMessageFromConsumer(consumer));
                }
            }
            finally {
                this.lock.readLock().unlock();
            }
        }
    }

    @Override
    protected Message internalReceive() throws PulsarClientException {
        try {
            Message message = (Message)this.incomingMessages.take();
            this.unAckedMessageTracker.add((MessageIdImpl)message.getMessageId());
            this.resumeReceivingFromPausedConsumersIfNeeded();
            return message;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
        try {
            Message message = (Message)this.incomingMessages.poll(timeout, unit);
            if (message != null) {
                this.unAckedMessageTracker.add((MessageIdImpl)message.getMessageId());
            }
            this.resumeReceivingFromPausedConsumersIfNeeded();
            return message;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CompletableFuture<Message> internalReceiveAsync() {
        CompletableFuture<Message> result = new CompletableFuture<Message>();
        try {
            this.lock.writeLock().lock();
            Message message = (Message)this.incomingMessages.poll(0L, TimeUnit.SECONDS);
            if (message == null) {
                this.pendingReceives.add(result);
            } else {
                this.unAckedMessageTracker.add((MessageIdImpl)message.getMessageId());
                this.resumeReceivingFromPausedConsumersIfNeeded();
                result.complete(message);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            result.completeExceptionally(new PulsarClientException(e));
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return result;
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        if (this.getState() != HandlerBase.State.Ready) {
            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException("Cumulative acknowledge not supported for partitioned topics"));
        }
        ConsumerImpl consumer = this.consumers.get(((MessageIdImpl)messageId).getPartitionIndex());
        return consumer.doAcknowledge(messageId, ackType, properties).thenRun(() -> this.unAckedMessageTracker.remove((MessageIdImpl)messageId));
    }

    @Override
    public CompletableFuture<Void> unsubscribeAsync() {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Partitioned Consumer was already closed"));
        }
        this.setState(HandlerBase.State.Closing);
        AtomicReference unsubscribeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.numPartitions);
        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        for (Consumer consumer : this.consumers) {
            if (consumer == null) continue;
            consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
                if (ex != null) {
                    unsubscribeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (unsubscribeFail.get() == null) {
                        this.setState(HandlerBase.State.Closed);
                        this.unAckedMessageTracker.close();
                        unsubscribeFuture.complete(null);
                        log.info("[{}] [{}] Unsubscribed Partitioned Consumer", (Object)this.topic, (Object)this.subscription);
                    } else {
                        this.setState(HandlerBase.State.Failed);
                        unsubscribeFuture.completeExceptionally((Throwable)unsubscribeFail.get());
                        log.error("[{}] [{}] Could not unsubscribe Partitioned Consumer", new Object[]{this.topic, this.subscription, ((Throwable)unsubscribeFail.get()).getCause()});
                    }
                }
                return null;
            });
        }
        return unsubscribeFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            this.unAckedMessageTracker.close();
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerBase.State.Closing);
        AtomicReference closeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.numPartitions);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        for (Consumer consumer : this.consumers) {
            if (consumer == null) continue;
            consumer.closeAsync().handle((closed, ex) -> {
                if (ex != null) {
                    closeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (closeFail.get() == null) {
                        this.setState(HandlerBase.State.Closed);
                        this.unAckedMessageTracker.close();
                        closeFuture.complete(null);
                        log.info("[{}] [{}] Closed Partitioned Consumer", (Object)this.topic, (Object)this.subscription);
                        this.client.cleanupConsumer(this);
                        this.failPendingReceive();
                    } else {
                        this.setState(HandlerBase.State.Failed);
                        closeFuture.completeExceptionally((Throwable)closeFail.get());
                        log.error("[{}] [{}] Could not close Partitioned Consumer", new Object[]{this.topic, this.subscription, ((Throwable)closeFail.get()).getCause()});
                    }
                }
                return null;
            });
        }
        return closeFuture;
    }

    private void failPendingReceive() {
        this.lock.readLock().lock();
        try {
            if (this.listenerExecutor != null && !this.listenerExecutor.isShutdown()) {
                CompletableFuture receiveFuture;
                while (!this.pendingReceives.isEmpty() && (receiveFuture = (CompletableFuture)this.pendingReceives.poll()) != null) {
                    receiveFuture.completeExceptionally(new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
                }
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public boolean isConnected() {
        for (ConsumerImpl consumer : this.consumers) {
            if (consumer.isConnected()) continue;
            return false;
        }
        return true;
    }

    @Override
    void connectionFailed(PulsarClientException exception) {
    }

    @Override
    void connectionOpened(ClientCnx cnx) {
    }

    void messageReceived(Message message) {
        this.lock.writeLock().lock();
        try {
            this.unAckedMessageTracker.add((MessageIdImpl)message.getMessageId());
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Received message from partitioned-consumer {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
            }
            if (!this.pendingReceives.isEmpty()) {
                CompletableFuture receivedFuture = (CompletableFuture)this.pendingReceives.poll();
                this.listenerExecutor.execute(() -> receivedFuture.complete(message));
            } else {
                this.incomingMessages.put(message);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (this.listener != null) {
            this.listenerExecutor.execute(() -> {
                Message msg;
                try {
                    msg = this.internalReceive();
                }
                catch (PulsarClientException e) {
                    log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e});
                    return;
                }
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{}] Calling message listener for message {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
                    }
                    this.listener.received(this, msg);
                }
                catch (Throwable t) {
                    log.error("[{}][{}] Message listener error in processing message: {}", new Object[]{this.topic, this.subscription, message, t});
                }
            });
        }
    }

    @Override
    String getHandlerName() {
        return this.subscription;
    }

    private ConsumerConfiguration getInternalConsumerConfig() {
        ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
        internalConsumerConfig.setReceiverQueueSize(this.conf.getReceiverQueueSize());
        internalConsumerConfig.setSubscriptionType(this.conf.getSubscriptionType());
        internalConsumerConfig.setConsumerName(this.consumerName);
        int receiverQueueSize = Math.min(this.conf.getReceiverQueueSize(), this.conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / this.numPartitions);
        internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
        if (this.conf.getCryptoKeyReader() != null) {
            internalConsumerConfig.setCryptoKeyReader(this.conf.getCryptoKeyReader());
            internalConsumerConfig.setCryptoFailureAction(this.conf.getCryptoFailureAction());
        }
        if (this.conf.getAckTimeoutMillis() != 0L) {
            internalConsumerConfig.setAckTimeout(this.conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
        }
        return internalConsumerConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void redeliverUnacknowledgedMessages() {
        PartitionedConsumerImpl partitionedConsumerImpl = this;
        synchronized (partitionedConsumerImpl) {
            for (ConsumerImpl c : this.consumers) {
                c.redeliverUnacknowledgedMessages();
            }
            this.incomingMessages.clear();
            this.unAckedMessageTracker.clear();
            this.resumeReceivingFromPausedConsumersIfNeeded();
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared) {
            this.redeliverUnacknowledgedMessages();
            return;
        }
        this.removeExpiredMessagesFromQueue(messageIds);
        messageIds.stream().collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet())).forEach((partitionIndex, messageIds1) -> this.consumers.get((int)partitionIndex).redeliverUnacknowledgedMessages((Set<MessageIdImpl>)messageIds1));
        this.resumeReceivingFromPausedConsumersIfNeeded();
    }

    @Override
    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            this.seekAsync(messageId).get();
        }
        catch (ExecutionException e) {
            throw new PulsarClientException(e.getCause());
        }
        catch (InterruptedException e) {
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics"));
    }

    public boolean isBatchingAckTrackerEmpty() {
        boolean state = true;
        for (Consumer consumer : this.consumers) {
            state &= ((ConsumerImpl)consumer).isBatchingAckTrackerEmpty();
        }
        return state;
    }

    List<ConsumerImpl> getConsumers() {
        return this.consumers;
    }

    @Override
    public int getAvailablePermits() {
        return this.consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
    }

    @Override
    public boolean hasReachedEndOfTopic() {
        return this.consumers.stream().allMatch(Consumer::hasReachedEndOfTopic);
    }

    @Override
    public int numMessagesInQueue() {
        return this.incomingMessages.size() + this.consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
    }

    @Override
    public synchronized ConsumerStats getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        for (int i = 0; i < this.numPartitions; ++i) {
            this.stats.updateCumulativeStats(this.consumers.get(i).getStats());
        }
        return this.stats;
    }

    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
        Message peek = (Message)this.incomingMessages.peek();
        if (peek != null) {
            if (!messageIds.contains((MessageIdImpl)peek.getMessageId())) {
                return;
            }
            Message message = (Message)this.incomingMessages.poll();
            while (message != null) {
                MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
                if (!messageIds.contains(messageId)) {
                    messageIds.add(messageId);
                    break;
                }
                message = (Message)this.incomingMessages.poll();
            }
        }
    }
}

