/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerStats;
import org.apache.pulsar.client.impl.ConsumerStatsDisabled;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Iterables;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.pulsar.checksum.utils.Crc32cChecksum;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerImpl
extends ConsumerBase {
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    private final long consumerId;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
    private volatile int availablePermits = 0;
    private MessageIdImpl lastDequeuedMessage;
    private long subscribeTimeout;
    private final int partitionIndex;
    private final int receiverQueueRefillThreshold;
    private final CompressionCodecProvider codecProvider;
    private volatile boolean waitingOnReceiveForZeroQueueSize = false;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReadWriteLock zeroQueueLock;
    private final UnAckedMessageTracker unAckedMessageTracker;
    private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
    protected final ConsumerStats stats;
    private final int priorityLevel;
    private final SubscriptionMode subscriptionMode;
    private BatchMessageIdImpl startMessageId;
    private volatile boolean hasReachedEndOfTopic;
    private MessageCrypto msgCrypto = null;
    private final Map<String, String> metadata;
    private final boolean readCompacted;
    private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

    ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture) {
        this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null);
    }

    ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId) {
        super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
        this.consumerId = client.newConsumerId();
        this.subscriptionMode = subscriptionMode;
        this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl)startMessageId) : null;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
        this.partitionIndex = partitionIndex;
        this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
        this.codecProvider = new CompressionCodecProvider();
        this.priorityLevel = conf.getPriorityLevel();
        this.batchMessageAckTracker = new ConcurrentSkipListMap<MessageIdImpl, BitSet>();
        this.readCompacted = conf.getReadCompacted();
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ConsumerStats(client, conf, this) : ConsumerStats.CONSUMER_STATS_DISABLED;
        this.zeroQueueLock = conf.getReceiverQueueSize() <= 1 ? new ReentrantReadWriteLock() : null;
        this.unAckedMessageTracker = conf.getAckTimeoutMillis() != 0L ? new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()) : UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        if (conf.getCryptoKeyReader() != null) {
            String logCtx = "[" + topic + "] [" + subscription + "]";
            this.msgCrypto = new MessageCrypto(logCtx, false);
        }
        this.metadata = conf.getProperties().isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, String>(conf.getProperties()));
        this.grabCnx();
    }

    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    @Override
    public CompletableFuture<Void> unsubscribeAsync() {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        if (this.isConnected()) {
            this.setState(HandlerBase.State.Closing);
            long requestId = this.client.newRequestId();
            ByteBuf unsubscribe = Commands.newUnsubscribe(this.consumerId, requestId);
            ClientCnx cnx = this.cnx();
            ((CompletableFuture)cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                cnx.removeConsumer(this.consumerId);
                log.info("[{}][{}] Successfully unsubscribed from topic", (Object)this.topic, (Object)this.subscription);
                this.batchMessageAckTracker.clear();
                this.unAckedMessageTracker.close();
                unsubscribeFuture.complete(null);
                this.setState(HandlerBase.State.Closed);
            })).exceptionally(e -> {
                log.error("[{}][{}] Failed to unsubscribe: {}", new Object[]{this.topic, this.subscription, e.getCause().getMessage()});
                unsubscribeFuture.completeExceptionally(e.getCause());
                this.setState(HandlerBase.State.Ready);
                return null;
            });
        } else {
            unsubscribeFuture.completeExceptionally(new PulsarClientException("Not connected to broker"));
        }
        return unsubscribeFuture;
    }

    @Override
    protected Message internalReceive() throws PulsarClientException {
        if (this.conf.getReceiverQueueSize() == 0) {
            Preconditions.checkArgument(this.zeroQueueLock != null, "Receiver queue size can't be modified");
            this.zeroQueueLock.writeLock().lock();
            try {
                Message message = this.fetchSingleMessageFromBroker();
                return message;
            }
            finally {
                this.zeroQueueLock.writeLock().unlock();
            }
        }
        try {
            Message message = (Message)this.incomingMessages.take();
            this.messageProcessed(message);
            return message;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.stats.incrementNumReceiveFailed();
            throw new PulsarClientException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CompletableFuture<Message> internalReceiveAsync() {
        CompletableFuture<Message> result = new CompletableFuture<Message>();
        Message message = null;
        try {
            this.lock.writeLock().lock();
            message = (Message)this.incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
            if (message == null) {
                this.pendingReceives.add(result);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            result.completeExceptionally(e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (message == null && this.conf.getReceiverQueueSize() == 0) {
            this.sendFlowPermitsToBroker(this.cnx(), 1);
        } else if (message != null) {
            this.messageProcessed(message);
            result.complete(message);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message fetchSingleMessageFromBroker() throws PulsarClientException {
        Preconditions.checkArgument(this.conf.getReceiverQueueSize() == 0);
        if (this.incomingMessages.size() > 0) {
            log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
            this.incomingMessages.clear();
        }
        try {
            Object msgCnx;
            Message message;
            this.waitingOnReceiveForZeroQueueSize = true;
            ConsumerImpl consumerImpl = this;
            synchronized (consumerImpl) {
                if (this.isConnected()) {
                    this.sendFlowPermitsToBroker(this.cnx(), 1);
                }
            }
            while (true) {
                message = (Message)this.incomingMessages.take();
                this.lastDequeuedMessage = (MessageIdImpl)message.getMessageId();
                msgCnx = ((MessageImpl)message).getCnx();
                ConsumerImpl consumerImpl2 = this;
                synchronized (consumerImpl2) {
                    if (msgCnx == this.cnx()) {
                        this.waitingOnReceiveForZeroQueueSize = false;
                        break;
                    }
                }
            }
            this.stats.updateNumMsgsReceived(message);
            msgCnx = message;
            return msgCnx;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.stats.incrementNumReceiveFailed();
            throw new PulsarClientException(e);
        }
        finally {
            this.waitingOnReceiveForZeroQueueSize = false;
            this.incomingMessages.clear();
        }
    }

    @Override
    protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
        try {
            Message message = (Message)this.incomingMessages.poll(timeout, unit);
            if (message != null) {
                this.messageProcessed(message);
            }
            return message;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            HandlerBase.State state = this.getState();
            if (state != HandlerBase.State.Closing && state != HandlerBase.State.Closed) {
                this.stats.incrementNumReceiveFailed();
                throw new PulsarClientException(e);
            }
            return null;
        }
    }

    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message, Map<String, Long> properties) {
        MessageIdImpl lowerKey = this.batchMessageAckTracker.lowerKey(message);
        if (lowerKey != null) {
            NavigableMap entriesUpto = this.batchMessageAckTracker.headMap((Object)lowerKey, true);
            for (Object key : entriesUpto.keySet()) {
                entriesUpto.remove(key);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", new Object[]{this.subscription, this.consumerId, lowerKey, batchMessageId});
            }
            this.sendAcknowledge(lowerKey, PulsarApi.CommandAck.AckType.Cumulative, properties);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] no messages prior to message {}", new Object[]{this.subscription, this.consumerId, batchMessageId});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        MessageIdImpl message = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
        BitSet bitSet = (BitSet)this.batchMessageAckTracker.get(message);
        if (bitSet == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] message not found {} for ack {}", new Object[]{this.subscription, this.consumerId, batchMessageId, ackType});
            }
            return true;
        }
        int batchIndex = batchMessageId.getBatchIndex();
        int batchSize = 0;
        int outstandingAcks = 0;
        boolean isAllMsgsAcked = false;
        this.lock.writeLock().lock();
        try {
            batchSize = bitSet.length();
            if (ackType == PulsarApi.CommandAck.AckType.Individual) {
                bitSet.clear(batchIndex);
            } else {
                bitSet.clear(0, batchIndex + 1);
            }
            isAllMsgsAcked = bitSet.isEmpty();
            if (log.isDebugEnabled()) {
                outstandingAcks = bitSet.cardinality();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (isAllMsgsAcked) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] can ack message to broker {}, acktype {}, cardinality {}, length {}", new Object[]{this.subscription, this.consumerName, batchMessageId, ackType, outstandingAcks, batchSize});
            }
            if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
                this.batchMessageAckTracker.keySet().removeIf(m -> m.compareTo(message) <= 0);
            }
            this.batchMessageAckTracker.remove(message);
            if (ackType == PulsarApi.CommandAck.AckType.Individual) {
                this.stats.incrementNumAcksSent(batchSize);
            }
            return true;
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            this.ackMessagesInEarlierBatch(batchMessageId, message, properties);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", new Object[]{this.subscription, this.consumerName, batchMessageId, ackType, outstandingAcks});
        }
        return false;
    }

    private void updateBatchAckTracker(MessageIdImpl message, PulsarApi.CommandAck.AckType ackType) {
        if (this.batchMessageAckTracker.isEmpty()) {
            return;
        }
        MessageIdImpl lowerKey = this.batchMessageAckTracker.lowerKey(message);
        if (lowerKey != null) {
            NavigableMap entriesUpto = this.batchMessageAckTracker.headMap((Object)lowerKey, true);
            for (Object key : entriesUpto.keySet()) {
                entriesUpto.remove(key);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] updated batch ack tracker up to message {} on cumulative ack for message {}", new Object[]{this.subscription, this.consumerId, lowerKey, message});
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] no messages to clean up prior to message {}", new Object[]{this.subscription, this.consumerId, message});
        }
    }

    public boolean isBatchingAckTrackerEmpty() {
        return this.batchMessageAckTracker.isEmpty();
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        if (this.getState() != HandlerBase.State.Ready && this.getState() != HandlerBase.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + (Object)((Object)this.getState())));
        }
        if (messageId instanceof BatchMessageIdImpl) {
            if (this.markAckForBatchMessage((BatchMessageIdImpl)messageId, ackType, properties)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] acknowledging message - {}, acktype {}", new Object[]{this.subscription, this.consumerName, messageId, ackType});
                }
            } else {
                return CompletableFuture.completedFuture(null);
            }
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) {
            this.updateBatchAckTracker((MessageIdImpl)messageId, ackType);
        }
        return this.sendAcknowledge(messageId, ackType, properties);
    }

    private CompletableFuture<Void> sendAcknowledge(final MessageId messageId, final PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        final MessageIdImpl msgId = (MessageIdImpl)messageId;
        final CompletableFuture<Void> ackFuture = new CompletableFuture<Void>();
        if (this.isConnected()) {
            ByteBuf cmd = Commands.newAck(this.consumerId, msgId.getLedgerId(), msgId.getEntryId(), ackType, null, properties);
            this.cnx().ctx().writeAndFlush(cmd).addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<Future<Void>>(){

                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    if (future.isSuccess()) {
                        if (ackType == PulsarApi.CommandAck.AckType.Individual) {
                            ConsumerImpl.this.unAckedMessageTracker.remove(msgId);
                            if (!(messageId instanceof BatchMessageIdImpl)) {
                                ConsumerImpl.this.stats.incrementNumAcksSent(1L);
                            }
                        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
                            ConsumerImpl.this.stats.incrementNumAcksSent(ConsumerImpl.this.unAckedMessageTracker.removeMessagesTill(msgId));
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] [{}] Successfully acknowledged message - {}, acktype {}", new Object[]{ConsumerImpl.this.subscription, ConsumerImpl.this.topic, ConsumerImpl.this.consumerName, messageId, ackType});
                        }
                        ackFuture.complete(null);
                    } else {
                        ConsumerImpl.this.stats.incrementNumAcksFailed();
                        ackFuture.completeExceptionally(new PulsarClientException(future.cause()));
                    }
                }
            });
        } else {
            this.stats.incrementNumAcksFailed();
            ackFuture.completeExceptionally(new PulsarClientException("Not connected to broker. State: " + (Object)((Object)this.getState())));
        }
        return ackFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void connectionOpened(ClientCnx cnx) {
        PulsarApi.MessageIdData startMessageIdData;
        boolean isDurable;
        int currentSize;
        this.setClientCnx(cnx);
        cnx.registerConsumer(this.consumerId, this);
        log.info("[{}][{}] Subscribing to topic on cnx {}", new Object[]{this.topic, this.subscription, cnx.ctx().channel()});
        long requestId = this.client.newRequestId();
        ConsumerImpl consumerImpl = this;
        synchronized (consumerImpl) {
            currentSize = this.incomingMessages.size();
            this.startMessageId = this.clearReceiverQueue();
            this.unAckedMessageTracker.clear();
            this.batchMessageAckTracker.clear();
        }
        boolean bl = isDurable = this.subscriptionMode == SubscriptionMode.Durable;
        if (isDurable) {
            startMessageIdData = null;
        } else {
            PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder();
            builder.setLedgerId(this.startMessageId.getLedgerId());
            builder.setEntryId(this.startMessageId.getEntryId());
            if (this.startMessageId instanceof BatchMessageIdImpl) {
                builder.setBatchIndex(this.startMessageId.getBatchIndex());
            }
            startMessageIdData = builder.build();
            builder.recycle();
        }
        ByteBuf request = Commands.newSubscribe(this.topic, this.subscription, this.consumerId, requestId, this.getSubType(), this.priorityLevel, this.consumerName, isDurable, startMessageIdData, this.metadata, this.readCompacted);
        if (startMessageIdData != null) {
            startMessageIdData.recycle();
        }
        ((CompletableFuture)cnx.sendRequestWithId(request, requestId).thenRun(() -> {
            ConsumerImpl consumerImpl = this;
            synchronized (consumerImpl) {
                if (this.changeToReadyState()) {
                    log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", new Object[]{this.topic, this.subscription, cnx.channel().remoteAddress(), this.consumerId});
                    AVAILABLE_PERMITS_UPDATER.set(this, 0);
                    if (this.waitingOnReceiveForZeroQueueSize || this.conf.getReceiverQueueSize() == 0 && currentSize > 0) {
                        this.sendFlowPermitsToBroker(cnx, 1);
                    }
                } else {
                    this.setState(HandlerBase.State.Closed);
                    cnx.removeConsumer(this.consumerId);
                    cnx.channel().close();
                    return;
                }
            }
            this.resetBackoff();
            boolean firstTimeConnect = this.subscribeFuture.complete(this);
            if (!(firstTimeConnect && this.partitionIndex > -1 || this.conf.getReceiverQueueSize() == 0)) {
                this.sendFlowPermitsToBroker(cnx, this.conf.getReceiverQueueSize());
            }
        })).exceptionally(e -> {
            cnx.removeConsumer(this.consumerId);
            if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
                cnx.channel().close();
                return null;
            }
            log.warn("[{}][{}] Failed to subscribe to topic on {}", new Object[]{this.topic, this.subscription, cnx.channel().remoteAddress()});
            if (e.getCause() instanceof PulsarClientException && this.isRetriableError((PulsarClientException)e.getCause()) && System.currentTimeMillis() < this.subscribeTimeout) {
                this.reconnectLater(e.getCause());
                return null;
            }
            if (!this.subscribeFuture.isDone()) {
                this.setState(HandlerBase.State.Failed);
                this.subscribeFuture.completeExceptionally((Throwable)e);
                this.client.cleanupConsumer(this);
            } else {
                this.reconnectLater(e.getCause());
            }
            return null;
        });
    }

    private BatchMessageIdImpl clearReceiverQueue() {
        ArrayList currentMessageQueue = new ArrayList(this.incomingMessages.size());
        this.incomingMessages.drainTo(currentMessageQueue);
        if (!currentMessageQueue.isEmpty()) {
            MessageIdImpl nextMessageInQueue = (MessageIdImpl)((Message)currentMessageQueue.get(0)).getMessageId();
            BatchMessageIdImpl previousMessage = nextMessageInQueue instanceof BatchMessageIdImpl ? new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), nextMessageInQueue.getEntryId(), nextMessageInQueue.getPartitionIndex(), ((BatchMessageIdImpl)nextMessageInQueue).getBatchIndex() - 1) : new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), nextMessageInQueue.getEntryId() - 1L, nextMessageInQueue.getPartitionIndex(), -1);
            return previousMessage;
        }
        if (this.lastDequeuedMessage != null) {
            return new BatchMessageIdImpl(this.lastDequeuedMessage);
        }
        return this.startMessageId;
    }

    void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
        if (cnx != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Adding {} additional permits", new Object[]{this.topic, this.subscription, numMessages});
            }
            cnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, numMessages), cnx.ctx().voidPromise());
        }
    }

    @Override
    void connectionFailed(PulsarClientException exception) {
        if (System.currentTimeMillis() > this.subscribeTimeout && this.subscribeFuture.completeExceptionally(exception)) {
            this.setState(HandlerBase.State.Failed);
            log.info("[{}] Consumer creation failed for consumer {}", (Object)this.topic, (Object)this.consumerId);
            this.client.cleanupConsumer(this);
        }
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            this.batchMessageAckTracker.clear();
            this.unAckedMessageTracker.close();
            return CompletableFuture.completedFuture(null);
        }
        if (!this.isConnected()) {
            log.info("[{}] [{}] Closed Consumer (not connected)", (Object)this.topic, (Object)this.subscription);
            this.setState(HandlerBase.State.Closed);
            this.batchMessageAckTracker.clear();
            this.unAckedMessageTracker.close();
            this.client.cleanupConsumer(this);
            return CompletableFuture.completedFuture(null);
        }
        Timeout timeout = this.stats.getStatTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        this.setState(HandlerBase.State.Closing);
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newCloseConsumer(this.consumerId, requestId);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        ClientCnx cnx = this.cnx();
        cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
            cnx.removeConsumer(this.consumerId);
            if (exception == null || !cnx.ctx().channel().isActive()) {
                log.info("[{}] [{}] Closed consumer", (Object)this.topic, (Object)this.subscription);
                this.setState(HandlerBase.State.Closed);
                this.batchMessageAckTracker.clear();
                this.unAckedMessageTracker.close();
                closeFuture.complete(null);
                this.client.cleanupConsumer(this);
                this.failPendingReceive();
            } else {
                closeFuture.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return closeFuture;
    }

    private void failPendingReceive() {
        this.lock.readLock().lock();
        try {
            if (this.listenerExecutor != null && !this.listenerExecutor.isShutdown()) {
                CompletableFuture receiveFuture;
                while (!this.pendingReceives.isEmpty() && (receiveFuture = (CompletableFuture)this.pendingReceives.poll()) != null) {
                    receiveFuture.completeExceptionally(new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
                }
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    void messageReceived(PulsarApi.MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
        if (ConsumerImpl.log.isDebugEnabled()) {
            ConsumerImpl.log.debug("[{}][{}] Received message: {}/{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId()});
        }
        msgMetadata = null;
        payload = headersAndPayload;
        if (!this.verifyChecksum(headersAndPayload, messageId)) {
            this.discardCorruptedMessage(messageId, cnx, PulsarApi.CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        try {
            msgMetadata = Commands.parseMessageMetadata(payload);
        }
        catch (Throwable t) {
            this.discardCorruptedMessage(messageId, cnx, PulsarApi.CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        decryptedPayload = this.decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx);
        if (decryptedPayload == null) {
            return;
        }
        uncompressedPayload = this.uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx);
        decryptedPayload.release();
        if (uncompressedPayload == null) {
            return;
        }
        numMessages = msgMetadata.getNumMessagesInBatch();
        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
            message = new MessageImpl(messageId, msgMetadata, uncompressedPayload, this.getPartitionIndex(), cnx);
            uncompressedPayload.release();
            msgMetadata.recycle();
            this.lock.readLock().lock();
            try {
                this.unAckedMessageTracker.add((MessageIdImpl)message.getMessageId());
                v0 = asyncReceivedWaiting = this.pendingReceives.isEmpty() == false;
                if ((this.conf.getReceiverQueueSize() != 0 || this.waitingOnReceiveForZeroQueueSize) && !asyncReceivedWaiting) {
                    this.incomingMessages.add(message);
                }
                if (!asyncReceivedWaiting) ** GOTO lbl51
                this.notifyPendingReceivedCallback(message, null);
            }
            finally {
                this.lock.readLock().unlock();
            }
        } else {
            if (this.conf.getReceiverQueueSize() == 0) {
                ConsumerImpl.log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", (Object)this.subscription, (Object)this.consumerName);
                this.closeAsync().handle((BiFunction<Void, Throwable, Object>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, lambda$messageReceived$6(java.lang.Void java.lang.Throwable ), (Ljava/lang/Void;Ljava/lang/Throwable;)Ljava/lang/Object;)((ConsumerImpl)this));
            } else {
                this.receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, cnx);
            }
            uncompressedPayload.release();
            msgMetadata.recycle();
        }
lbl51:
        // 3 sources

        if (this.listener != null) {
            this.listenerExecutor.execute((Runnable)LambdaMetafactory.metafactory(null, null, null, ()V, lambda$messageReceived$7(int ), ()V)((ConsumerImpl)this, (int)numMessages));
        }
    }

    void notifyPendingReceivedCallback(MessageImpl message, Exception exception) {
        if (!this.pendingReceives.isEmpty()) {
            CompletableFuture receivedFuture = (CompletableFuture)this.pendingReceives.poll();
            if (exception == null) {
                Preconditions.checkNotNull(message, "received message can't be null");
                if (receivedFuture != null) {
                    if (this.conf.getReceiverQueueSize() == 0) {
                        receivedFuture.complete(message);
                    } else {
                        this.messageProcessed(message);
                        this.listenerExecutor.execute(() -> receivedFuture.complete(message));
                    }
                }
            } else {
                this.listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void receiveIndividualMessagesFromBatch(PulsarApi.MessageMetadata msgMetadata, ByteBuf uncompressedPayload, PulsarApi.MessageIdData messageId, ClientCnx cnx) {
        int batchSize = msgMetadata.getNumMessagesInBatch();
        BitSet bitSet = new BitSet(batchSize);
        MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex());
        bitSet.set(0, batchSize);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] added bit set for message {}, cardinality {}, length {}", new Object[]{this.subscription, this.consumerName, batchMessage, bitSet.cardinality(), bitSet.length()});
        }
        this.batchMessageAckTracker.put(batchMessage, bitSet);
        this.unAckedMessageTracker.add(batchMessage);
        int skippedMessages = 0;
        try {
            for (int i = 0; i < batchSize; ++i) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] processing message num - {} in batch", new Object[]{this.subscription, this.consumerName, i});
                }
                PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, i, batchSize);
                if (this.subscriptionMode == SubscriptionMode.NonDurable && this.startMessageId != null && messageId.getLedgerId() == this.startMessageId.getLedgerId() && messageId.getEntryId() == this.startMessageId.getEntryId() && i <= this.startMessageId.getBatchIndex()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Ignoring message from before the startMessageId", (Object)this.subscription, (Object)this.consumerName);
                    }
                    ++skippedMessages;
                    continue;
                }
                BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex(), i);
                MessageImpl message = new MessageImpl(batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, cnx);
                this.lock.readLock().lock();
                try {
                    if (this.pendingReceives.isEmpty()) {
                        this.incomingMessages.add(message);
                    } else {
                        this.notifyPendingReceivedCallback(message, null);
                    }
                }
                finally {
                    this.lock.readLock().unlock();
                }
                singleMessagePayload.release();
                singleMessageMetadataBuilder.recycle();
            }
        }
        catch (IOException e) {
            log.warn("[{}] [{}] unable to obtain message in batch", (Object)this.subscription, (Object)this.consumerName);
            this.batchMessageAckTracker.remove(batchMessage);
            this.discardCorruptedMessage(messageId, cnx, PulsarApi.CommandAck.ValidationError.BatchDeSerializeError);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", new Object[]{this.subscription, this.consumerName, this.incomingMessages.size(), this.incomingMessages.remainingCapacity()});
        }
        if (skippedMessages > 0) {
            this.increaseAvailablePermits(cnx, skippedMessages);
        }
    }

    protected synchronized void messageProcessed(Message msg) {
        ClientCnx currentCnx = this.cnx();
        ClientCnx msgCnx = ((MessageImpl)msg).getCnx();
        this.lastDequeuedMessage = (MessageIdImpl)msg.getMessageId();
        if (msgCnx != currentCnx) {
            return;
        }
        this.increaseAvailablePermits(currentCnx);
        this.stats.updateNumMsgsReceived(msg);
        if (this.conf.getAckTimeoutMillis() != 0L) {
            MessageIdImpl id = (MessageIdImpl)msg.getMessageId();
            if (id instanceof BatchMessageIdImpl) {
                id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), this.getPartitionIndex());
            }
            if (this.partitionIndex != -1) {
                this.unAckedMessageTracker.remove(id);
            } else {
                this.unAckedMessageTracker.add(id);
            }
        }
    }

    void increaseAvailablePermits(ClientCnx currentCnx) {
        this.increaseAvailablePermits(currentCnx, 1);
    }

    private void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
        int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
        while (available >= this.receiverQueueRefillThreshold) {
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                this.sendFlowPermitsToBroker(currentCnx, available);
                break;
            }
            available = AVAILABLE_PERMITS_UPDATER.get(this);
        }
    }

    private ByteBuf decryptPayloadIfNeeded(PulsarApi.MessageIdData messageId, PulsarApi.MessageMetadata msgMetadata, ByteBuf payload, ClientCnx currentCnx) {
        if (msgMetadata.getEncryptionKeysCount() == 0) {
            return payload.retain();
        }
        if (this.conf.getCryptoKeyReader() == null) {
            if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
                log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.", new Object[]{this.topic, this.subscription, this.consumerName});
                return payload.retain();
            }
            if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD) {
                log.warn("[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName});
                this.discardMessage(messageId, currentCnx, PulsarApi.CommandAck.ValidationError.DecryptionError);
            } else {
                log.error("[{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not implemented to consume encrypted message", new Object[]{this.topic, this.subscription, this.consumerName});
            }
            return null;
        }
        ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload, this.conf.getCryptoKeyReader());
        if (decryptedData != null) {
            return decryptedData;
        }
        if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to consume.", new Object[]{this.topic, this.subscription, this.consumerName, messageId});
            return payload.retain();
        }
        if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD) {
            log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName, messageId});
            this.discardMessage(messageId, currentCnx, PulsarApi.CommandAck.ValidationError.DecryptionError);
        } else {
            log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message", new Object[]{this.topic, this.subscription, this.consumerName, messageId});
        }
        return null;
    }

    private ByteBuf uncompressPayloadIfNeeded(PulsarApi.MessageIdData messageId, PulsarApi.MessageMetadata msgMetadata, ByteBuf payload, ClientCnx currentCnx) {
        PulsarApi.CompressionType compressionType = msgMetadata.getCompression();
        CompressionCodec codec = this.codecProvider.getCodec(compressionType);
        int uncompressedSize = msgMetadata.getUncompressedSize();
        int payloadSize = payload.readableBytes();
        if (payloadSize > 5232640) {
            log.error("[{}][{}] Got corrupted payload message size {} at {}", new Object[]{this.topic, this.subscription, payloadSize, messageId});
            this.discardCorruptedMessage(messageId, currentCnx, PulsarApi.CommandAck.ValidationError.UncompressedSizeCorruption);
            return null;
        }
        try {
            ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
            return uncompressedPayload;
        }
        catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", new Object[]{this.topic, this.subscription, compressionType, messageId, e.getMessage(), e});
            this.discardCorruptedMessage(messageId, currentCnx, PulsarApi.CommandAck.ValidationError.DecompressionError);
            return null;
        }
    }

    private boolean verifyChecksum(ByteBuf headersAndPayload, PulsarApi.MessageIdData messageId) {
        int computedChecksum;
        int checksum;
        if (Commands.hasChecksum(headersAndPayload) && (checksum = Commands.readChecksum(headersAndPayload).intValue()) != (computedChecksum = Crc32cChecksum.computeChecksum(headersAndPayload))) {
            log.error("[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId(), Long.toHexString(checksum), Integer.toHexString(computedChecksum)});
            return false;
        }
        return true;
    }

    private void discardCorruptedMessage(PulsarApi.MessageIdData messageId, ClientCnx currentCnx, PulsarApi.CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", new Object[]{this.topic, this.subscription, messageId.getLedgerId(), messageId.getEntryId()});
        this.discardMessage(messageId, currentCnx, validationError);
    }

    private void discardMessage(PulsarApi.MessageIdData messageId, ClientCnx currentCnx, PulsarApi.CommandAck.ValidationError validationError) {
        ByteBuf cmd = Commands.newAck(this.consumerId, messageId.getLedgerId(), messageId.getEntryId(), PulsarApi.CommandAck.AckType.Individual, validationError, Collections.emptyMap());
        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
        this.increaseAvailablePermits(currentCnx);
        this.stats.incrementNumReceiveFailed();
    }

    @Override
    String getHandlerName() {
        return this.subscription;
    }

    @Override
    public boolean isConnected() {
        return this.getClientCnx() != null && this.getState() == HandlerBase.State.Ready;
    }

    int getPartitionIndex() {
        return this.partitionIndex;
    }

    @Override
    public int getAvailablePermits() {
        return AVAILABLE_PERMITS_UPDATER.get(this);
    }

    @Override
    public int numMessagesInQueue() {
        return this.incomingMessages.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void redeliverUnacknowledgedMessages() {
        ClientCnx cnx = this.cnx();
        if (this.isConnected() && cnx.getRemoteEndpointProtocolVersion() >= PulsarApi.ProtocolVersion.v2.getNumber()) {
            int currentSize = 0;
            ConsumerImpl consumerImpl = this;
            synchronized (consumerImpl) {
                currentSize = this.incomingMessages.size();
                this.incomingMessages.clear();
                this.unAckedMessageTracker.clear();
                this.batchMessageAckTracker.clear();
            }
            cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId), cnx.ctx().voidPromise());
            if (currentSize > 0) {
                this.increaseAvailablePermits(cnx, currentSize);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", new Object[]{this.subscription, this.topic, this.consumerName, currentSize});
            }
            return;
        }
        if (cnx == null || this.getState() == HandlerBase.State.Connecting) {
            log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", (Object)this);
        } else {
            log.warn("[{}] Reconnecting the client to redeliver the messages.", (Object)this);
            cnx.ctx().close();
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared) {
            this.redeliverUnacknowledgedMessages();
            return;
        }
        ClientCnx cnx = this.cnx();
        if (this.isConnected() && cnx.getRemoteEndpointProtocolVersion() >= PulsarApi.ProtocolVersion.v2.getNumber()) {
            int messagesFromQueue = this.removeExpiredMessagesFromQueue(messageIds);
            Iterable<List<MessageIdImpl>> batches = Iterables.partition(messageIds, 1000);
            PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder();
            batches.forEach(ids -> {
                List<PulsarApi.MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
                    this.batchMessageAckTracker.remove(messageId);
                    builder.setPartition(messageId.getPartitionIndex());
                    builder.setLedgerId(messageId.getLedgerId());
                    builder.setEntryId(messageId.getEntryId());
                    return builder.build();
                }).collect(Collectors.toList());
                ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(this.consumerId, messageIdDatas);
                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                messageIdDatas.forEach(PulsarApi.MessageIdData::recycle);
            });
            if (messagesFromQueue > 0) {
                this.increaseAvailablePermits(cnx, messagesFromQueue);
            }
            builder.recycle();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", new Object[]{this.subscription, this.topic, this.consumerName, messagesFromQueue});
            }
            return;
        }
        if (cnx == null || this.getState() == HandlerBase.State.Connecting) {
            log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", (Object)this);
        } else {
            log.warn("[{}] Reconnecting the client to redeliver the messages.", (Object)this);
            cnx.ctx().close();
        }
    }

    @Override
    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            this.seekAsync(messageId).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        if (this.getState() == HandlerBase.State.Closing || this.getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        if (!this.isConnected()) {
            return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
        }
        CompletableFuture<Void> seekFuture = new CompletableFuture<Void>();
        long requestId = this.client.newRequestId();
        MessageIdImpl msgId = (MessageIdImpl)messageId;
        ByteBuf seek = Commands.newSeek(this.consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
        ClientCnx cnx = this.cnx();
        log.info("[{}][{}] Seek subscription to message id {}", new Object[]{this.topic, this.subscription, messageId});
        ((CompletableFuture)cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
            log.info("[{}][{}] Successfully reset subscription to message id {}", new Object[]{this.topic, this.subscription, messageId});
            seekFuture.complete(null);
        })).exceptionally(e -> {
            log.error("[{}][{}] Failed to reset subscription: {}", new Object[]{this.topic, this.subscription, e.getCause().getMessage()});
            seekFuture.completeExceptionally(e.getCause());
            return null;
        });
        return seekFuture;
    }

    private MessageIdImpl getMessageIdImpl(Message msg) {
        MessageIdImpl messageId = (MessageIdImpl)msg.getMessageId();
        if (messageId instanceof BatchMessageIdImpl) {
            messageId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), this.getPartitionIndex());
        }
        return messageId;
    }

    private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
        int messagesFromQueue = 0;
        Message peek = (Message)this.incomingMessages.peek();
        if (peek != null) {
            MessageIdImpl messageId = this.getMessageIdImpl(peek);
            if (!messageIds.contains(messageId)) {
                return 0;
            }
            Message message = (Message)this.incomingMessages.poll();
            while (message != null) {
                ++messagesFromQueue;
                MessageIdImpl id = this.getMessageIdImpl(message);
                if (!messageIds.contains(id)) {
                    messageIds.add(id);
                    break;
                }
                message = (Message)this.incomingMessages.poll();
            }
        }
        return messagesFromQueue;
    }

    @Override
    public ConsumerStats getStats() {
        if (this.stats instanceof ConsumerStatsDisabled) {
            return null;
        }
        return this.stats;
    }

    void setTerminated() {
        log.info("[{}] [{}] [{}] Consumer has reached the end of topic", new Object[]{this.subscription, this.topic, this.consumerName});
        this.hasReachedEndOfTopic = true;
        if (this.listener != null) {
            this.listener.reachedEndOfTopic(this);
        }
    }

    @Override
    public boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public int hashCode() {
        return Objects.hash(this.topic, this.subscription, this.consumerName);
    }

    private /* synthetic */ void lambda$messageReceived$7(int numMessages) {
        for (int i = 0; i < numMessages; ++i) {
            try {
                Message msg = this.internalReceive(0, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    if (!log.isDebugEnabled()) break;
                    log.debug("[{}] [{}] Message has been cleared from the queue", (Object)this.topic, (Object)this.subscription);
                    break;
                }
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{}] Calling message listener for message {}", new Object[]{this.topic, this.subscription, msg.getMessageId()});
                    }
                    this.listener.received(this, msg);
                }
                catch (Throwable t) {
                    log.error("[{}][{}] Message listener error in processing message: {}", new Object[]{this.topic, this.subscription, msg.getMessageId(), t});
                }
                continue;
            }
            catch (PulsarClientException e) {
                log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e});
                return;
            }
        }
    }

    private /* synthetic */ Object lambda$messageReceived$6(Void ok, Throwable e) {
        this.notifyPendingReceivedCallback(null, new PulsarClientException.InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
        return null;
    }

    static enum SubscriptionMode {
        Durable,
        NonDurable;

    }
}

