/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private final Subscription subscription;
    private final PulsarApi.CommandSubscribe.SubType subType;
    private final ServerCnx cnx;
    private final String appId;
    private AuthenticationDataSource authenticationData;
    private final String topicName;
    private final int partitionIdx;
    private final PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private final Rate msgOut;
    private final Rate msgRedeliver;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private volatile int messagePermits = 0;
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStats stats;
    private final int maxUnackedMessages;
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private final Map<String, String> metadata;
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(Subscription subscription, PulsarApi.CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, ServerCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition) throws BrokerServiceException {
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = topicName;
        this.partitionIdx = TopicName.getPartitionIndex((String)topicName);
        this.consumerId = consumerId;
        this.priorityLevel = priorityLevel;
        this.readCompacted = readCompacted;
        this.consumerName = consumerName;
        this.maxUnackedMessages = maxUnackedMessages;
        this.subscriptionInitialPosition = subscriptionInitialPosition;
        this.cnx = cnx;
        this.msgOut = new Rate();
        this.msgRedeliver = new Rate();
        this.appId = appId;
        this.authenticationData = cnx.authenticationData;
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        this.metadata = metadata != null ? metadata : Collections.emptyMap();
        this.stats = new ConsumerStats();
        this.stats.setAddress(cnx.clientAddress().toString());
        this.stats.consumerName = consumerName;
        this.stats.setConnectedSince(DateFormatter.now());
        this.stats.setClientVersion(cnx.getClientVersion());
        this.stats.metadata = this.metadata;
        this.pendingAcks = subType == PulsarApi.CommandSubscribe.SubType.Shared ? new ConcurrentLongLongPairHashMap(256, 1) : null;
    }

    public PulsarApi.CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    void notifyActiveConsumerChange(Consumer activeConsumer) {
        if (!Commands.peerSupportsActiveConsumerListener((int)this.cnx.getRemoteEndpointProtocolVersion())) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{this.consumerId, this.topicName, this.subscription.getName(), activeConsumer});
        }
        this.cnx.ctx().writeAndFlush((Object)Commands.newActiveConsumerChange((long)this.consumerId, (this == activeConsumer ? 1 : 0) != 0), this.cnx.ctx().voidPromise());
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public SendMessageInfo sendMessages(List<Entry> entries) {
        return this.sendMessages(entries, null);
    }

    public SendMessageInfo sendMessages(List<Entry> entries, SendListener listener) {
        ChannelPromise writePromise;
        ChannelHandlerContext ctx = this.cnx.ctx();
        SendMessageInfo sentMessages = new SendMessageInfo();
        ChannelPromise channelPromise = writePromise = listener != null ? ctx.newPromise() : ctx.voidPromise();
        if (listener != null) {
            writePromise.addListener(future -> listener.sendComplete((ChannelFuture)writePromise, sentMessages));
        }
        if (entries.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, this.consumerId});
            }
            writePromise.setSuccess();
            sentMessages.totalSentMessages = 0;
            sentMessages.totalSentMessageBytes = 0L;
            return sentMessages;
        }
        try {
            this.updatePermitsAndPendingAcks(entries, sentMessages);
        }
        catch (PulsarServerException pulsarServerException) {
            log.warn("[{}] [{}] consumer doesn't support batch-message {}", new Object[]{this.subscription, this.consumerId, this.cnx.getRemoteEndpointProtocolVersion()});
            this.subscription.markTopicWithBatchMessagePublished();
            sentMessages.totalSentMessages = 0;
            sentMessages.totalSentMessageBytes = 0L;
            this.disconnect();
            return sentMessages;
        }
        ctx.channel().eventLoop().execute(() -> {
            int i = 0;
            while (i < entries.size()) {
                Entry entry = (Entry)entries.get(i);
                PositionImpl pos = (PositionImpl)entry.getPosition();
                PulsarApi.MessageIdData.Builder messageIdBuilder = PulsarApi.MessageIdData.newBuilder();
                PulsarApi.MessageIdData messageId = messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId()).setPartition(this.partitionIdx).build();
                ByteBuf metadataAndPayload = entry.getDataBuffer();
                metadataAndPayload.retain();
                if (this.cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v11.getNumber()) {
                    Commands.skipChecksumIfPresent((ByteBuf)metadataAndPayload);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] Sending message to consumerId {}, entry id {}", new Object[]{this.topicName, this.subscription, this.consumerId, pos.getEntryId()});
                }
                ChannelPromise promise = ctx.voidPromise();
                if (i == entries.size() - 1) {
                    promise = writePromise;
                }
                ctx.write((Object)Commands.newMessage((long)this.consumerId, (PulsarApi.MessageIdData)messageId, (ByteBuf)metadataAndPayload), promise);
                messageId.recycle();
                messageIdBuilder.recycle();
                entry.release();
                ++i;
            }
            ctx.flush();
        });
        return sentMessages;
    }

    private void incrementUnackedMessages(int ackedMessages) {
        if (this.shouldBlockConsumerOnUnackMsgs() && this.addAndGetUnAckedMsgs(this, ackedMessages) >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
    }

    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) {
        try {
            metadataAndPayload.markReaderIndex();
            PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)metadataAndPayload);
            metadataAndPayload.resetReaderIndex();
            int batchSize = metadata.getNumMessagesInBatch();
            metadata.recycle();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] num messages in batch are {} ", new Object[]{subscription, consumerId, batchSize});
            }
            return batchSize;
        }
        catch (Throwable t) {
            log.error("[{}] [{}] Failed to parse message metadata", new Object[]{subscription, consumerId, t});
            return -1;
        }
    }

    void updatePermitsAndPendingAcks(List<Entry> entries, SendMessageInfo sentMessages) throws PulsarServerException {
        int permitsToReduce = 0;
        Iterator<Entry> iter = entries.iterator();
        boolean unsupportedVersion = false;
        long totalReadableBytes = 0L;
        boolean clientSupportBatchMessages = this.cnx.isBatchMessageCompatibleVersion();
        while (iter.hasNext()) {
            Entry entry = iter.next();
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            int batchSize = Consumer.getBatchSizeforEntry(metadataAndPayload, this.subscription, this.consumerId);
            if (batchSize == -1) {
                iter.remove();
                PositionImpl pos = (PositionImpl)entry.getPosition();
                entry.release();
                this.subscription.acknowledgeMessage(Collections.singletonList(pos), PulsarApi.CommandAck.AckType.Individual, Collections.emptyMap());
                continue;
            }
            if (this.pendingAcks != null) {
                this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), (long)batchSize, 0L);
            }
            if (batchSize > 1 && !clientSupportBatchMessages) {
                unsupportedVersion = true;
            }
            totalReadableBytes += (long)metadataAndPayload.readableBytes();
            permitsToReduce += batchSize;
        }
        int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
        this.incrementUnackedMessages(permitsToReduce);
        if (unsupportedVersion) {
            throw new PulsarServerException("Consumer does not support batch-message");
        }
        if (permits < 0 && log.isDebugEnabled()) {
            log.debug("[{}-{}] [{}] message permits dropped below 0 - {}", new Object[]{this.topicName, this.subscription, this.consumerId, permits});
        }
        this.msgOut.recordMultipleEvents((long)permitsToReduce, totalReadableBytes);
        sentMessages.totalSentMessages = permitsToReduce;
        sentMessages.totalSentMessageBytes = totalReadableBytes;
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void sendError(ByteBuf error) {
        this.cnx.ctx().writeAndFlush((Object)error).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    public void close() throws BrokerServiceException {
        this.subscription.removeConsumer(this);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        log.info("Disconnecting consumer: {}", (Object)this);
        this.cnx.closeConsumer(this);
        try {
            this.close();
        }
        catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    void doUnsubscribe(long requestId) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        ((CompletableFuture)this.subscription.doUnsubscribe(this).thenAccept(v -> {
            log.info("Unsubscribed successfully from {}", (Object)this.subscription);
            this.cnx.removedConsumer(this);
            ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
        })).exceptionally(exception -> {
            log.warn("Unsubscribe failed for {}", (Object)this.subscription, exception);
            ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(exception.getCause()), (String)exception.getCause().getMessage()));
            return null;
        });
    }

    void messageAcked(PulsarApi.CommandAck ack) {
        Map<String, Long> properties = Collections.emptyMap();
        if (ack.getPropertiesCount() > 0) {
            properties = ack.getPropertiesList().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
        }
        if (ack.getAckType() == PulsarApi.CommandAck.AckType.Cumulative) {
            if (ack.getMessageIdCount() != 1) {
                log.warn("[{}] [{}] Received multi-message ack at {} - Reason: {}", (Object)this.subscription, (Object)this.consumerId);
                return;
            }
            if (this.subType == PulsarApi.CommandSubscribe.SubType.Shared) {
                log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", (Object)this.subscription, (Object)this.consumerId);
                return;
            }
            PulsarApi.MessageIdData msgId = ack.getMessageId(0);
            PositionImpl position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
            this.subscription.acknowledgeMessage(Collections.singletonList(position), PulsarApi.CommandAck.AckType.Cumulative, properties);
        } else {
            ArrayList<Position> positionsAcked = new ArrayList<Position>();
            int i = 0;
            while (i < ack.getMessageIdCount()) {
                PulsarApi.MessageIdData msgId = ack.getMessageId(i);
                PositionImpl position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
                positionsAcked.add((Position)position);
                if (this.subType == PulsarApi.CommandSubscribe.SubType.Shared) {
                    this.removePendingAcks(position);
                }
                if (ack.hasValidationError()) {
                    log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, this.consumerId, position, ack.getValidationError()});
                }
                ++i;
            }
            this.subscription.acknowledgeMessage(positionsAcked, PulsarApi.CommandAck.AckType.Individual, properties);
        }
    }

    void flowPermits(int additionalNumberOfMessages) {
        int oldPermits;
        Preconditions.checkArgument((additionalNumberOfMessages > 0 ? 1 : 0) != 0);
        if (this.shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (!this.blockedConsumerOnUnackedMsgs) {
            oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
            this.subscription.consumerFlow(this, additionalNumberOfMessages);
        } else {
            oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = ", new Object[]{this.topicName, this.subscription, additionalNumberOfMessages, oldPermits, this.blockedConsumerOnUnackedMsgs});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
        this.subscription.consumerFlow(consumer, additionalNumberOfPermits);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= 9) {
            log.info("[{}] Notifying consumer that end of topic has been reached", (Object)this);
            this.cnx.ctx().writeAndFlush((Object)Commands.newReachedEndOfTopic((long)this.consumerId));
        }
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return PulsarApi.CommandSubscribe.SubType.Shared.equals((Object)this.subType) && this.maxUnackedMessages > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgRedeliver.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
    }

    public ConsumerStats getStats() {
        this.stats.availablePermits = this.getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        return this.stats;
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("subscription", (Object)this.subscription).add("consumerId", this.consumerId).add("consumerName", (Object)this.consumerName).add("address", (Object)this.cnx.clientAddress()).toString();
    }

    public ChannelHandlerContext ctx() {
        return this.cnx.ctx();
    }

    public void checkPermissions() {
        TopicName topicName = TopicName.get((String)this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationService().canConsume(topicName, this.appId, this.authenticationData, this.subscription.getName())) {
                    return;
                }
            }
            catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to consume from topic [{}] anymore", (Object)this.appId, (Object)this.subscription.getTopicName());
            this.disconnect();
        }
    }

    public boolean equals(Object obj) {
        if (obj instanceof Consumer) {
            Consumer other = (Consumer)obj;
            return Objects.equals(this.cnx.clientAddress(), other.cnx.clientAddress()) && this.consumerId == other.consumerId;
        }
        return false;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + 31 * ((Object)((Object)this.cnx)).hashCode();
    }

    private void removePendingAcks(PositionImpl position) {
        Consumer ackOwnedConsumer = null;
        if (this.pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
            for (Consumer consumer : this.subscription.getConsumers()) {
                if (consumer.equals(this) || !consumer.getPendingAcks().containsKey(position.getLedgerId(), position.getEntryId())) continue;
                ackOwnedConsumer = consumer;
                break;
            }
        } else {
            ackOwnedConsumer = this;
        }
        if (ackOwnedConsumer != null) {
            int totalAckedMsgs = (int)ackOwnedConsumer.getPendingAcks().get((long)position.getLedgerId(), (long)position.getEntryId()).first;
            if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, this.consumerId, position});
            }
            if (this.addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= this.maxUnackedMessages / 2 && ackOwnedConsumer.blockedConsumerOnUnackedMsgs && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
                ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
                this.flowConsumerBlockedPermits(ackOwnedConsumer);
            }
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages() {
        this.clearUnAckedMsgs(this);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, this.consumerId});
        }
        this.subscription.redeliverUnacknowledgedMessages(this);
        this.flowConsumerBlockedPermits(this);
        if (this.pendingAcks != null) {
            AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
            this.pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
                int n = totalRedeliveryMessages.addAndGet((int)batchSize);
            });
            this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages.get(), (long)totalRedeliveryMessages.get());
            this.pendingAcks.clear();
        }
    }

    public void redeliverUnacknowledgedMessages(List<PulsarApi.MessageIdData> messageIds) {
        int totalRedeliveryMessages = 0;
        ArrayList pendingPositions = Lists.newArrayList();
        for (PulsarApi.MessageIdData msg : messageIds) {
            PositionImpl position = PositionImpl.get((long)msg.getLedgerId(), (long)msg.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair batchSize = this.pendingAcks.get(position.getLedgerId(), position.getEntryId());
            if (batchSize == null) continue;
            this.pendingAcks.remove(position.getLedgerId(), position.getEntryId());
            totalRedeliveryMessages = (int)((long)totalRedeliveryMessages + batchSize.first);
            pendingPositions.add(position);
        }
        this.addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, this.consumerId, totalRedeliveryMessages, pendingPositions.size()});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages, (long)totalRedeliveryMessages);
        int numberOfBlockedPermits = Math.min(totalRedeliveryMessages, PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this));
        if (numberOfBlockedPermits > 0) {
            PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits);
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
            this.subscription.consumerFlow(this, numberOfBlockedPermits);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
        this.subscription.addUnAckedMessages(ackedMessages);
        return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
    }

    private void clearUnAckedMsgs(Consumer consumer) {
        int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
        this.subscription.addUnAckedMessages(-unaAckedMsgs);
    }

    public static interface SendListener {
        public void sendComplete(ChannelFuture var1, SendMessageInfo var2);
    }

    public static final class SendMessageInfo {
        private int totalSentMessages;
        private long totalSentMessageBytes;

        public int getTotalSentMessages() {
            return this.totalSentMessages;
        }

        public void setTotalSentMessages(int totalSentMessages) {
            this.totalSentMessages = totalSentMessages;
        }

        public long getTotalSentMessageBytes() {
            return this.totalSentMessageBytes;
        }

        public void setTotalSentMessageBytes(long totalSentMessageBytes) {
            this.totalSentMessageBytes = totalSentMessageBytes;
        }
    }
}

