/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.GenericFutureListener;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private final Subscription subscription;
    private final PulsarApi.CommandSubscribe.SubType subType;
    private final ServerCnx cnx;
    private final String appId;
    private final long consumerId;
    private final int priorityLevel;
    private final String consumerName;
    private final Rate msgOut;
    private final Rate msgRedeliver;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private volatile int messagePermits = 0;
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStats stats;
    private final int maxUnackedMessages;
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(Subscription subscription, PulsarApi.CommandSubscribe.SubType subType, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {
        this.subscription = subscription;
        this.subType = subType;
        this.consumerId = consumerId;
        this.priorityLevel = priorityLevel;
        this.consumerName = consumerName;
        this.maxUnackedMessages = maxUnackedMessages;
        this.cnx = cnx;
        this.msgOut = new Rate();
        this.msgRedeliver = new Rate();
        this.appId = appId;
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        this.stats = new ConsumerStats();
        this.stats.address = cnx.clientAddress().toString();
        this.stats.consumerName = consumerName;
        this.stats.connectedSince = PersistentTopic.DATE_FORMAT.format(Instant.now());
        this.stats.clientVersion = cnx.getClientVersion();
        this.pendingAcks = subType == PulsarApi.CommandSubscribe.SubType.Shared ? new ConcurrentLongLongPairHashMap(256, 1) : null;
    }

    public PulsarApi.CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    public Pair<ChannelPromise, Integer> sendMessages(List<Entry> entries) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        MutablePair sentMessages = new MutablePair();
        ChannelPromise writePromise = ctx.newPromise();
        sentMessages.setLeft((Object)writePromise);
        if (entries.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}", (Object)this.subscription, (Object)this.consumerId);
            }
            writePromise.setSuccess();
            sentMessages.setRight((Object)0);
            return sentMessages;
        }
        try {
            sentMessages.setRight((Object)this.updatePermitsAndPendingAcks(entries));
        }
        catch (PulsarServerException pulsarServerException) {
            log.warn("[{}] [{}] consumer doesn't support batch-message {}", new Object[]{this.subscription, this.consumerId, this.cnx.getRemoteEndpointProtocolVersion()});
            this.subscription.markTopicWithBatchMessagePublished();
            sentMessages.setRight((Object)0);
            this.disconnect();
            return sentMessages;
        }
        ctx.channel().eventLoop().execute(() -> {
            int i = 0;
            while (i < entries.size()) {
                Entry entry = (Entry)entries.get(i);
                PositionImpl pos = (PositionImpl)entry.getPosition();
                PulsarApi.MessageIdData.Builder messageIdBuilder = PulsarApi.MessageIdData.newBuilder();
                PulsarApi.MessageIdData messageId = messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId()).build();
                ByteBuf metadataAndPayload = entry.getDataBuffer();
                metadataAndPayload.retain();
                if (this.cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v6.getNumber()) {
                    Commands.readChecksum((ByteBuf)metadataAndPayload);
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sending message to consumerId {}, entry id {}", new Object[]{this.subscription, this.consumerId, pos.getEntryId()});
                }
                ChannelPromise promise = ctx.voidPromise();
                if (i == entries.size() - 1) {
                    promise = writePromise;
                }
                ctx.write((Object)Commands.newMessage((long)this.consumerId, (PulsarApi.MessageIdData)messageId, (ByteBuf)metadataAndPayload), promise);
                messageId.recycle();
                messageIdBuilder.recycle();
                entry.release();
                ++i;
            }
            ctx.flush();
        });
        return sentMessages;
    }

    private void incrementUnackedMessages(int ackedMessages) {
        if (this.shouldBlockConsumerOnUnackMsgs() && this.addAndGetUnAckedMsgs(this, ackedMessages) >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
    }

    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
        try {
            metadataAndPayload.markReaderIndex();
            PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)metadataAndPayload);
            metadataAndPayload.resetReaderIndex();
            int batchSize = metadata.getNumMessagesInBatch();
            metadata.recycle();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] num messages in batch are {} ", new Object[]{subscription, consumerId, batchSize});
            }
            return batchSize;
        }
        catch (Throwable t) {
            log.error("[{}] [{}] Failed to parse message metadata", new Object[]{subscription, consumerId, t});
            return -1;
        }
    }

    int updatePermitsAndPendingAcks(List<Entry> entries) throws PulsarServerException {
        int permitsToReduce = 0;
        Iterator<Entry> iter = entries.iterator();
        boolean unsupportedVersion = false;
        long totalReadableBytes = 0L;
        boolean clientSupportBatchMessages = this.cnx.isBatchMessageCompatibleVersion();
        while (iter.hasNext()) {
            Entry entry = iter.next();
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            int batchSize = Consumer.getBatchSizeforEntry(metadataAndPayload, this.subscription.toString(), this.consumerId);
            if (batchSize == -1) {
                iter.remove();
                PositionImpl pos = (PositionImpl)entry.getPosition();
                entry.release();
                this.subscription.acknowledgeMessage(pos, PulsarApi.CommandAck.AckType.Individual);
                continue;
            }
            if (this.pendingAcks != null) {
                this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), (long)batchSize, 0L);
            }
            if (batchSize > 1 && !clientSupportBatchMessages) {
                unsupportedVersion = true;
            }
            totalReadableBytes += (long)metadataAndPayload.readableBytes();
            permitsToReduce += batchSize;
        }
        int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
        this.incrementUnackedMessages(permitsToReduce);
        if (unsupportedVersion) {
            throw new PulsarServerException("Consumer does not support batch-message");
        }
        if (permits < 0 && log.isDebugEnabled()) {
            log.debug("[{}] [{}] message permits dropped below 0 - {}", new Object[]{this.subscription, this.consumerId, permits});
        }
        this.msgOut.recordMultipleEvents((long)permitsToReduce, totalReadableBytes);
        return permitsToReduce;
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void sendError(ByteBuf error) {
        this.cnx.ctx().writeAndFlush((Object)error).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    public void close() throws BrokerServiceException {
        this.subscription.removeConsumer(this);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        log.info("Disconnecting consumer: {}", (Object)this);
        this.cnx.closeConsumer(this);
        try {
            this.close();
        }
        catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    void doUnsubscribe(long requestId) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        ((CompletableFuture)this.subscription.doUnsubscribe(this).thenAccept(v -> {
            log.info("Unsubscribed successfully from {}", (Object)this.subscription);
            this.cnx.removedConsumer(this);
            ctx.writeAndFlush((Object)Commands.newSuccess((long)requestId));
        })).exceptionally(exception -> {
            log.warn("Unsubscribe failed for {}", (Object)this.subscription, exception);
            ctx.writeAndFlush((Object)Commands.newError((long)requestId, (PulsarApi.ServerError)BrokerServiceException.getClientErrorCode(exception.getCause()), (String)exception.getCause().getMessage()));
            return null;
        });
    }

    void messageAcked(PulsarApi.CommandAck ack) {
        PulsarApi.MessageIdData msgId = ack.getMessageId();
        PositionImpl position = PositionImpl.get((long)msgId.getLedgerId(), (long)msgId.getEntryId());
        if (ack.hasValidationError()) {
            log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, this.consumerId, position, ack.getValidationError()});
        }
        if (this.subType == PulsarApi.CommandSubscribe.SubType.Shared) {
            Preconditions.checkArgument((ack.getAckType() == PulsarApi.CommandAck.AckType.Individual ? 1 : 0) != 0);
            this.removePendingAcks(position);
            this.subscription.acknowledgeMessage(position, PulsarApi.CommandAck.AckType.Individual);
        } else {
            this.subscription.acknowledgeMessage(position, ack.getAckType());
        }
    }

    void flowPermits(int additionalNumberOfMessages) {
        int oldPermits;
        Preconditions.checkArgument((additionalNumberOfMessages > 0 ? 1 : 0) != 0);
        if (this.shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.get(this) >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (!this.blockedConsumerOnUnackedMsgs) {
            oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
            this.subscription.consumerFlow(this, additionalNumberOfMessages);
        } else {
            oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Added more flow control message permits {} (old was: {})", new Object[]{this, additionalNumberOfMessages, oldPermits});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
        this.subscription.consumerFlow(consumer, additionalNumberOfPermits);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= 9) {
            log.info("[{}] Notifying consumer that end of topic has been reached", (Object)this);
            this.cnx.ctx().writeAndFlush((Object)Commands.newReachedEndOfTopic((long)this.consumerId));
        }
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return PulsarApi.CommandSubscribe.SubType.Shared.equals((Object)this.subType) && this.maxUnackedMessages > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgRedeliver.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
    }

    public ConsumerStats getStats() {
        this.stats.availablePermits = this.getAvailablePermits();
        this.stats.unackedMessages = UNACKED_MESSAGES_UPDATER.get(this);
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        return this.stats;
    }

    public int getUnackedMessages() {
        return UNACKED_MESSAGES_UPDATER.get(this);
    }

    public String toString() {
        return Objects.toStringHelper((Object)this).add("subscription", (Object)this.subscription).add("consumerId", this.consumerId).add("consumerName", (Object)this.consumerName).add("address", (Object)this.cnx.clientAddress()).toString();
    }

    public ChannelHandlerContext ctx() {
        return this.cnx.ctx();
    }

    public void checkPermissions() {
        DestinationName destination = DestinationName.get((String)this.subscription.getDestination());
        if (this.cnx.getBrokerService().getAuthorizationManager() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationManager().canConsume(destination, this.appId)) {
                    return;
                }
            }
            catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.subscription.getDestination(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to consume from Destination [{}] anymore", (Object)this.appId, (Object)this.subscription.getDestination());
            this.disconnect();
        }
    }

    public boolean equals(Object obj) {
        if (obj instanceof Consumer) {
            Consumer other = (Consumer)obj;
            return Objects.equal((Object)this.cnx.clientAddress(), (Object)other.cnx.clientAddress()) && this.consumerId == other.consumerId;
        }
        return false;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + 31 * ((Object)((Object)this.cnx)).hashCode();
    }

    private void removePendingAcks(PositionImpl position) {
        Consumer ackOwnedConsumer = null;
        if (this.pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
            for (Consumer consumer : this.subscription.getConsumers()) {
                if (consumer.equals(this) || !consumer.getPendingAcks().containsKey(position.getLedgerId(), position.getEntryId())) continue;
                ackOwnedConsumer = consumer;
                break;
            }
        } else {
            ackOwnedConsumer = this;
        }
        if (ackOwnedConsumer != null) {
            int totalAckedMsgs = (int)ackOwnedConsumer.getPendingAcks().get((long)position.getLedgerId(), (long)position.getEntryId()).first;
            ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId());
            if (this.addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= this.maxUnackedMessages / 2 && ackOwnedConsumer.blockedConsumerOnUnackedMsgs && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
                ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
                this.flowConsumerBlockedPermits(ackOwnedConsumer);
            }
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages() {
        this.clearUnAckedMsgs(this);
        this.blockedConsumerOnUnackedMsgs = false;
        this.subscription.redeliverUnacknowledgedMessages(this);
        this.flowConsumerBlockedPermits(this);
        if (this.pendingAcks != null) {
            AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
            this.pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
                int n = totalRedeliveryMessages.addAndGet((int)batchSize);
            });
            this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages.get(), (long)totalRedeliveryMessages.get());
            this.pendingAcks.clear();
        }
    }

    public void redeliverUnacknowledgedMessages(List<PulsarApi.MessageIdData> messageIds) {
        int totalRedeliveryMessages = 0;
        ArrayList pendingPositions = Lists.newArrayList();
        for (PulsarApi.MessageIdData msg : messageIds) {
            PositionImpl position = PositionImpl.get((long)msg.getLedgerId(), (long)msg.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair batchSize = this.pendingAcks.get(position.getLedgerId(), position.getEntryId());
            if (batchSize == null) continue;
            this.pendingAcks.remove(position.getLedgerId(), position.getEntryId());
            totalRedeliveryMessages = (int)((long)totalRedeliveryMessages + batchSize.first);
            pendingPositions.add(position);
        }
        this.addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
        this.blockedConsumerOnUnackedMsgs = false;
        this.subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
        this.msgRedeliver.recordMultipleEvents((long)totalRedeliveryMessages, (long)totalRedeliveryMessages);
        int numberOfBlockedPermits = Math.min(totalRedeliveryMessages, PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this));
        if (numberOfBlockedPermits > 0) {
            PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits);
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
            this.subscription.consumerFlow(this, numberOfBlockedPermits);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
        this.subscription.addUnAckedMessages(ackedMessages);
        return UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages);
    }

    private void clearUnAckedMsgs(Consumer consumer) {
        int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
        this.subscription.addUnAckedMessages(-unaAckedMsgs);
    }
}

