/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private final Subscription subscription;
    private final PulsarApi.CommandSubscribe.SubType subType;
    private final TransportCnx cnx;
    private final String appId;
    private AuthenticationDataSource authenticationData;
    private final String topicName;
    private final int partitionIdx;
    private final PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private final Rate msgOut;
    private final Rate msgRedeliver;
    private final LongAdder msgOutCounter;
    private final LongAdder bytesOutCounter;
    private long lastConsumedTimestamp;
    private long lastAckedTimestamp;
    private Rate chuckedMessageRate;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private volatile int messagePermits = 0;
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStats stats;
    private final int maxUnackedMessages;
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private final Map<String, String> metadata;
    private final PulsarApi.KeySharedMeta keySharedMeta;
    private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
    private volatile int avgMessagesPerEntry = 1000;
    private static final double avgPercent = 0.9;
    private boolean preciseDispatcherFlowControl;
    private PositionImpl readPositionWhenJoining;
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(Subscription subscription, PulsarApi.CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition, PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = topicName;
        this.partitionIdx = TopicName.getPartitionIndex((String)topicName);
        this.consumerId = consumerId;
        this.priorityLevel = priorityLevel;
        this.readCompacted = readCompacted;
        this.consumerName = consumerName;
        this.maxUnackedMessages = maxUnackedMessages;
        this.subscriptionInitialPosition = subscriptionInitialPosition;
        this.keySharedMeta = keySharedMeta;
        this.cnx = cnx;
        this.msgOut = new Rate();
        this.chuckedMessageRate = new Rate();
        this.msgRedeliver = new Rate();
        this.bytesOutCounter = new LongAdder();
        this.msgOutCounter = new LongAdder();
        this.appId = appId;
        this.authenticationData = cnx.getAuthenticationData();
        this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        AVG_MESSAGES_PER_ENTRY.set(this, 1000);
        this.metadata = metadata != null ? metadata : Collections.emptyMap();
        this.stats = new ConsumerStats();
        if (cnx.hasHAProxyMessage()) {
            this.stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
        } else {
            this.stats.setAddress(cnx.clientAddress().toString());
        }
        this.stats.consumerName = consumerName;
        this.stats.setConnectedSince(DateFormatter.now());
        this.stats.setClientVersion(cnx.getClientVersion());
        this.stats.metadata = this.metadata;
        this.pendingAcks = Subscription.isIndividualAckMode(subType) ? new ConcurrentLongLongPairHashMap(256, 1) : null;
    }

    public PulsarApi.CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    void notifyActiveConsumerChange(Consumer activeConsumer) {
        if (log.isDebugEnabled()) {
            log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{this.consumerId, this.topicName, this.subscription.getName(), activeConsumer});
        }
        this.cnx.getCommandSender().sendActiveConsumerChange(this.consumerId, this == activeConsumer);
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public Future<Void> sendMessages(List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
        this.lastConsumedTimestamp = System.currentTimeMillis();
        if (entries.isEmpty() || totalMessages == 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, this.consumerId});
            }
            batchSizes.recyle();
            if (batchIndexesAcks != null) {
                batchIndexesAcks.recycle();
            }
            Promise<Void> writePromise = this.cnx.newPromise();
            writePromise.setSuccess(null);
            return writePromise;
        }
        if (this.pendingAcks != null) {
            for (int i = 0; i < entries.size(); ++i) {
                Entry entry = entries.get(i);
                if (entry == null) continue;
                int batchSize = batchSizes.getBatchSize(i);
                this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), (long)batchSize, 0L);
                if (!log.isDebugEnabled()) continue;
                log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in broker.service.Consumer for consumerId: {}", new Object[]{this.topicName, this.subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, this.consumerId});
            }
        }
        int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
        tmpAvgMessagesPerEntry = (int)Math.round((double)tmpAvgMessagesPerEntry * 0.9 + 0.09999999999999998 * (double)totalMessages / (double)entries.size());
        AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
        int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
        MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer for consumerId: {}; avgMessagesPerEntry is {}", new Object[]{this.topicName, this.subscription, ackedCount, totalMessages, this.consumerId, tmpAvgMessagesPerEntry});
        }
        this.incrementUnackedMessages(totalMessages);
        this.msgOut.recordMultipleEvents((long)totalMessages, totalBytes);
        this.msgOutCounter.add(totalMessages);
        this.bytesOutCounter.add(totalBytes);
        this.chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0L);
        return this.cnx.getCommandSender().sendMessagesToConsumer(this.consumerId, this.topicName, this.subscription, this.partitionIdx, entries, batchSizes, batchIndexesAcks, redeliveryTracker);
    }

    private void incrementUnackedMessages(int ackedMessages) {
        if (Subscription.isIndividualAckMode(this.subType) && this.addAndGetUnAckedMsgs(this, ackedMessages) >= this.maxUnackedMessages && this.maxUnackedMessages > 0) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void close() throws BrokerServiceException {
        this.close(false);
    }

    public void close(boolean isResetCursor) throws BrokerServiceException {
        this.subscription.removeConsumer(this, isResetCursor);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        this.disconnect(false);
    }

    public void disconnect(boolean isResetCursor) {
        log.info("Disconnecting consumer: {}", (Object)this);
        this.cnx.closeConsumer(this);
        try {
            this.close(isResetCursor);
        }
        catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    public void doUnsubscribe(long requestId) {
        ((CompletableFuture)this.subscription.doUnsubscribe(this).thenAccept(v -> {
            log.info("Unsubscribed successfully from {}", (Object)this.subscription);
            this.cnx.removedConsumer(this);
            this.cnx.getCommandSender().sendSuccess(requestId);
        })).exceptionally(exception -> {
            log.warn("Unsubscribe failed for {}", (Object)this.subscription, exception);
            this.cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage());
            return null;
        });
    }

    public CompletableFuture<Void> messageAcked(PulsarApi.CommandAck ack) {
        this.lastAckedTimestamp = System.currentTimeMillis();
        Map<String, Long> properties = Collections.emptyMap();
        if (ack.getPropertiesCount() > 0) {
            properties = ack.getPropertiesList().stream().collect(Collectors.toMap(PulsarApi.KeyLongValue::getKey, PulsarApi.KeyLongValue::getValue));
        }
        if (ack.getAckType() == PulsarApi.CommandAck.AckType.Cumulative) {
            List<PositionImpl> positionsAcked;
            if (ack.getMessageIdCount() != 1) {
                log.warn("[{}] [{}] Received multi-message ack", (Object)this.subscription, (Object)this.consumerId);
            }
            if (Subscription.isIndividualAckMode(this.subType)) {
                log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", (Object)this.subscription, (Object)this.consumerId);
            }
            PositionImpl position = PositionImpl.earliest;
            if (ack.getMessageIdCount() == 1) {
                PulsarApi.MessageIdData msgId = ack.getMessageId(0);
                position = msgId.getAckSetCount() > 0 ? PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId(), (long[])SafeCollectionUtils.longListToArray((List)msgId.getAckSetList())) : PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            }
            if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                positionsAcked = Collections.singletonList(position);
                return this.transactionCumulativeAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked);
            }
            positionsAcked = Collections.singletonList(position);
            this.subscription.acknowledgeMessage(positionsAcked, PulsarApi.CommandAck.AckType.Cumulative, properties);
            return CompletableFuture.completedFuture(null);
        }
        if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
            return this.individualAckWithTransaction(ack);
        }
        return this.individualAckNormal(ack, properties);
    }

    private CompletableFuture<Void> individualAckNormal(PulsarApi.CommandAck ack, Map<String, Long> properties) {
        ArrayList<Position> positionsAcked = new ArrayList<Position>();
        Object checkBatchPositions = null;
        for (int i = 0; i < ack.getMessageIdCount(); ++i) {
            PositionImpl position;
            PulsarApi.MessageIdData msgId = ack.getMessageId(i);
            if (msgId.getAckSetCount() > 0) {
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId(), (long[])SafeCollectionUtils.longListToArray((List)msgId.getAckSetList()));
                if (this.isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
                    ((PersistentSubscription)this.subscription).syncBatchPositionBitSetForPendingAck(position);
                }
            } else {
                position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            }
            positionsAcked.add((Position)position);
            this.checkCanRemovePendingAcksAndHandle(position, msgId);
            this.checkAckValidationError(ack, position);
        }
        this.subscription.acknowledgeMessage(positionsAcked, PulsarApi.CommandAck.AckType.Individual, properties);
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        completableFuture.complete(null);
        if (this.isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
                if (((PositionImpl)position).getAckSet() != null && ((PersistentSubscription)this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl)position)) {
                    this.removePendingAcks((PositionImpl)position);
                }
            }));
        }
        return completableFuture;
    }

    private CompletableFuture<Void> individualAckWithTransaction(PulsarApi.CommandAck ack) {
        ArrayList<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<MutablePair<PositionImpl, Integer>>();
        if (!this.isTransactionEnabled()) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        for (int i = 0; i < ack.getMessageIdCount(); ++i) {
            PulsarApi.MessageIdData msgId = ack.getMessageId(i);
            PositionImpl position = msgId.getAckSetCount() > 0 ? PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId(), (long[])SafeCollectionUtils.longListToArray((List)msgId.getAckSetList())) : PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            if (msgId.hasBatchIndex()) {
                positionsAcked.add((MutablePair<PositionImpl, Integer>)new MutablePair((Object)position, (Object)msgId.getBatchSize()));
            } else {
                positionsAcked.add((MutablePair<PositionImpl, Integer>)new MutablePair((Object)position, (Object)0));
            }
            this.checkCanRemovePendingAcksAndHandle(position, msgId);
            this.checkAckValidationError(ack, position);
        }
        CompletableFuture<Void> completableFuture = this.transactionIndividualAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked);
        if (Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionLongMutablePair -> {
                if (((PositionImpl)positionLongMutablePair.getLeft()).getAckSet() != null && ((PersistentSubscription)this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl)positionLongMutablePair.left)) {
                    this.removePendingAcks((PositionImpl)positionLongMutablePair.left);
                }
            }));
        }
        return completableFuture;
    }

    private void checkAckValidationError(PulsarApi.CommandAck ack, PositionImpl position) {
        if (ack.hasValidationError()) {
            log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, this.consumerId, position, ack.getValidationError()});
        }
    }

    private void checkCanRemovePendingAcksAndHandle(PositionImpl position, PulsarApi.MessageIdData msgId) {
        if (Subscription.isIndividualAckMode(this.subType) && msgId.getAckSetCount() == 0) {
            this.removePendingAcks(position);
        }
    }

    private boolean isTransactionEnabled() {
        return this.subscription instanceof PersistentSubscription && ((PersistentTopic)this.subscription.getTopic()).getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
    }

    private CompletableFuture<Void> transactionIndividualAcknowledge(long txnidMostBits, long txnidLeastBits, List<MutablePair<PositionImpl, Integer>> positionList) {
        if (this.subscription instanceof PersistentSubscription) {
            TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
            return ((PersistentSubscription)this.subscription).transactionIndividualAcknowledge(txnID, positionList);
        }
        String error = "Transaction acknowledge only support the `PersistentSubscription`.";
        log.error(error);
        return FutureUtil.failedFuture((Throwable)new TransactionConflictException(error));
    }

    private CompletableFuture<Void> transactionCumulativeAcknowledge(long txnidMostBits, long txnidLeastBits, List<PositionImpl> positionList) {
        if (!this.isTransactionEnabled()) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        if (this.subscription instanceof PersistentSubscription) {
            TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
            return ((PersistentSubscription)this.subscription).transactionCumulativeAcknowledge(txnID, positionList);
        }
        String error = "Transaction acknowledge only support the `PersistentSubscription`.";
        log.error(error);
        return FutureUtil.failedFuture((Throwable)new TransactionConflictException(error));
    }

    public void flowPermits(int additionalNumberOfMessages) {
        int oldPermits;
        Preconditions.checkArgument((additionalNumberOfMessages > 0 ? 1 : 0) != 0);
        if (this.shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (!this.blockedConsumerOnUnackedMsgs) {
            oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher for consumer", new Object[]{this.topicName, this.subscription, additionalNumberOfMessages, this.consumerId});
            }
            this.subscription.consumerFlow(this, additionalNumberOfMessages);
        } else {
            oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", new Object[]{this.topicName, this.subscription, additionalNumberOfMessages, oldPermits, this.blockedConsumerOnUnackedMsgs});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer", new Object[]{this.topicName, this.subscription, additionalNumberOfPermits, this.consumerId});
        }
        this.subscription.consumerFlow(consumer, additionalNumberOfPermits);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public int getAvgMessagesPerEntry() {
        return AVG_MESSAGES_PER_ENTRY.get(this);
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        this.cnx.getCommandSender().sendReachedEndOfTopic(this.consumerId);
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return Subscription.isIndividualAckMode(this.subType) && this.maxUnackedMessages > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.chuckedMessageRate.calculateRate();
        this.msgRedeliver.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
        this.stats.chuckedMessageRate = this.chuckedMessageRate.getRate();
    }

    public void updateStats(ConsumerStats consumerStats) {
        this.msgOutCounter.add(consumerStats.msgOutCounter);
        this.bytesOutCounter.add(consumerStats.bytesOutCounter);
        this.msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter);
        this.lastAckedTimestamp = consumerStats.lastAckedTimestamp;
        this.lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
        MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer", new Object[]{this.topicName, this.subscription, consumerStats.availablePermits, this.consumerId});
        }
        this.unackedMessages = consumerStats.unackedMessages;
        this.blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
        AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
    }

    public ConsumerStats getStats() {
        this.stats.msgOutCounter = this.msgOutCounter.longValue();
        this.stats.bytesOutCounter = this.bytesOutCounter.longValue();
        this.stats.lastAckedTimestamp = this.lastAckedTimestamp;
        this.stats.lastConsumedTimestamp = this.lastConsumedTimestamp;
        this.stats.availablePermits = this.getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        this.stats.avgMessagesPerEntry = this.getAvgMessagesPerEntry();
        if (this.readPositionWhenJoining != null) {
            this.stats.readPositionWhenJoining = this.readPositionWhenJoining.toString();
        }
        return this.stats;
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public PulsarApi.KeySharedMeta getKeySharedMeta() {
        return this.keySharedMeta;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("subscription", (Object)this.subscription).add("consumerId", this.consumerId).add("consumerName", (Object)this.consumerName).add("address", (Object)this.cnx.clientAddress()).toString();
    }

    public void checkPermissions() {
        TopicName topicName = TopicName.get((String)this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationService().canConsume(topicName, this.appId, this.authenticationData, this.subscription.getName())) {
                    return;
                }
            }
            catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to consume from topic [{}] anymore", (Object)this.appId, (Object)this.subscription.getTopicName());
            this.disconnect();
        }
    }

    public boolean equals(Object obj) {
        if (obj instanceof Consumer) {
            Consumer other = (Consumer)obj;
            return Objects.equals(this.cnx.clientAddress(), other.cnx.clientAddress()) && this.consumerId == other.consumerId;
        }
        return false;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + 31 * this.cnx.hashCode();
    }

    private void removePendingAcks(PositionImpl position) {
        ConcurrentLongLongPairHashMap.LongPair ackedPosition;
        Consumer ackOwnedConsumer = null;
        if (this.pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
            for (Consumer consumer : this.subscription.getConsumers()) {
                if (consumer.equals(this) || !consumer.getPendingAcks().containsKey(position.getLedgerId(), position.getEntryId())) continue;
                ackOwnedConsumer = consumer;
                break;
            }
        } else {
            ackOwnedConsumer = this;
        }
        ConcurrentLongLongPairHashMap.LongPair longPair = ackedPosition = ackOwnedConsumer != null ? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) : null;
        if (ackedPosition != null) {
            int totalAckedMsgs = (int)ackedPosition.first;
            if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, this.consumerId, position});
            }
            if (this.addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= this.maxUnackedMessages / 2 && ackOwnedConsumer.blockedConsumerOnUnackedMsgs && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
                ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
                this.flowConsumerBlockedPermits(ackOwnedConsumer);
            }
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages() {
        this.clearUnAckedMsgs();
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, this.consumerId});
        }
        if (this.pendingAcks != null) {
            ArrayList<PositionImpl> pendingPositions = new ArrayList<PositionImpl>((int)this.pendingAcks.size());
            MutableInt totalRedeliveryMessages = new MutableInt(0);
            this.pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
                totalRedeliveryMessages.add((int)batchSize);
                pendingPositions.add(new PositionImpl(ledgerId, entryId));
            });
            for (PositionImpl p : pendingPositions) {
                this.pendingAcks.remove(p.getLedgerId(), p.getEntryId());
            }
            this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages.intValue(), (long)totalRedeliveryMessages.intValue());
            this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        } else {
            this.subscription.redeliverUnacknowledgedMessages(this);
        }
        this.flowConsumerBlockedPermits(this);
    }

    public void redeliverUnacknowledgedMessages(List<PulsarApi.MessageIdData> messageIds) {
        int totalRedeliveryMessages = 0;
        ArrayList pendingPositions = Lists.newArrayList();
        for (PulsarApi.MessageIdData msg : messageIds) {
            PositionImpl position = PositionImpl.get((long)msg.getLedgerId(), (long)msg.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair batchSize = this.pendingAcks.get(position.getLedgerId(), position.getEntryId());
            if (batchSize == null) continue;
            this.pendingAcks.remove(position.getLedgerId(), position.getEntryId());
            totalRedeliveryMessages = (int)((long)totalRedeliveryMessages + batchSize.first);
            pendingPositions.add(position);
        }
        this.addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, this.consumerId, totalRedeliveryMessages, pendingPositions.size()});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages, (long)totalRedeliveryMessages);
        int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
        if (numberOfBlockedPermits > 0) {
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}", new Object[]{this.topicName, this.subscription, numberOfBlockedPermits, this.consumerId});
            }
            this.subscription.consumerFlow(this, numberOfBlockedPermits);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
        this.subscription.addUnAckedMessages(ackedMessages);
        return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
    }

    private void clearUnAckedMsgs() {
        int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
        this.subscription.addUnAckedMessages(-unaAckedMsgs);
    }

    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
        this.readPositionWhenJoining = readPositionWhenJoining;
    }

    public TransportCnx cnx() {
        return this.cnx;
    }
}

