/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentAcknowledgmentsGroupingTracker
implements AcknowledgmentsGroupingTracker {
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl)MessageId.earliest;
    private volatile BitSetRecyclable lastCumulativeAckSet = null;
    private volatile boolean cumulativeAckFlushRequired = false;
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck");
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, BitSetRecyclable> LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, BitSetRecyclable.class, "lastCumulativeAckSet");
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
    private final ScheduledFuture<?> scheduledTask;

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, EventLoopGroup eventLoopGroup) {
        this.consumer = consumer;
        this.pendingIndividualAcks = new ConcurrentSkipListSet();
        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap();
        this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
        this.scheduledTask = this.acknowledgementGroupTimeMicros > 0L ? eventLoopGroup.next().scheduleWithFixedDelay(this::flush, this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS) : null;
    }

    @Override
    public boolean isDuplicate(MessageId messageId) {
        if (messageId.compareTo((Object)this.lastCumulativeAck) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains(messageId);
    }

    @Override
    public void addAcknowledgment(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || !properties.isEmpty()) {
            this.doImmediateAck(msgId, ackType, properties);
        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            this.doCumulativeAck(msgId, null);
        } else {
            if (msgId instanceof BatchMessageIdImpl) {
                this.pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()));
            } else {
                this.pendingIndividualAcks.add(msgId);
            }
            this.pendingIndividualBatchIndexAcks.remove(msgId);
            if (this.pendingIndividualAcks.size() >= 1000) {
                this.flush();
            }
        }
    }

    @Override
    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || !properties.isEmpty()) {
            this.doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties);
        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            BitSetRecyclable bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
            bitSet.clear(0, batchIndex + 1);
            this.doCumulativeAck(msgId, bitSet);
        } else if (ackType == PulsarApi.CommandAck.AckType.Individual) {
            ConcurrentBitSetRecyclable bitSet = this.pendingIndividualBatchIndexAcks.computeIfAbsent(new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), v -> {
                ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create();
                value.set(0, batchSize + 1);
                value.clear(batchIndex);
                return value;
            });
            bitSet.clear(batchIndex);
            if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                this.flush();
            }
        }
    }

    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
        block4: {
            BitSetRecyclable lastBitSet;
            MessageIdImpl lastCumlativeAck;
            do {
                lastCumlativeAck = this.lastCumulativeAck;
                lastBitSet = this.lastCumulativeAckSet;
                if (msgId.compareTo(lastCumlativeAck) <= 0) break block4;
            } while (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) || !LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet));
            if (lastBitSet != null) {
                try {
                    lastBitSet.recycle();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.cumulativeAckFlushRequired = true;
            return;
        }
    }

    private boolean doImmediateAck(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return false;
        }
        this.newAckCommand(this.consumer.consumerId, msgId, null, ackType, null, properties, cnx, true);
        return true;
    }

    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return false;
        }
        BitSetRecyclable bitSet = BitSetRecyclable.create();
        bitSet.set(0, batchSize);
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            bitSet.clear(0, batchIndex + 1);
        } else {
            bitSet.clear(batchIndex);
        }
        ByteBuf cmd = Commands.newAck(this.consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        return true;
    }

    @Override
    public void flush() {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
            return;
        }
        boolean shouldFlush = false;
        if (this.cumulativeAckFlushRequired) {
            this.newAckCommand(this.consumer.consumerId, this.lastCumulativeAck, this.lastCumulativeAckSet, PulsarApi.CommandAck.AckType.Cumulative, null, Collections.emptyMap(), cnx, false);
            shouldFlush = true;
            this.cumulativeAckFlushRequired = false;
        }
        ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(this.pendingIndividualAcks.size() + this.pendingIndividualBatchIndexAcks.size());
        if (!this.pendingIndividualAcks.isEmpty()) {
            MessageIdImpl msgId;
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
                        for (MessageIdImpl cMsgId : chunkMsgIds) {
                            if (cMsgId == null) continue;
                            entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                        }
                        this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
                        continue;
                    }
                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
                }
            } else {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    this.newAckCommand(this.consumer.consumerId, msgId, null, PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap(), cnx, false);
                    shouldFlush = true;
                }
            }
        }
        if (!this.pendingIndividualBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> iterator = this.pendingIndividualBatchIndexAcks.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = iterator.next();
                entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue()));
                iterator.remove();
            }
        }
        if (entriesToAck.size() > 0) {
            cnx.ctx().write(Commands.newMultiMessageAck(this.consumer.consumerId, entriesToAck), cnx.ctx().voidPromise());
            shouldFlush = true;
        }
        if (shouldFlush) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks, this.pendingIndividualBatchIndexAcks});
            }
            cnx.ctx().flush();
        }
    }

    @Override
    public void flushAndClean() {
        this.flush();
        this.lastCumulativeAck = (MessageIdImpl)MessageId.earliest;
        this.pendingIndividualAcks.clear();
    }

    @Override
    public void close() {
        this.flush();
        if (this.scheduledTask != null && !this.scheduledTask.isCancelled()) {
            this.scheduledTask.cancel(true);
        }
    }

    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet, PulsarApi.CommandAck.AckType ackType, PulsarApi.CommandAck.ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush) {
        MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
        if (chunkMsgIds != null) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != PulsarApi.CommandAck.AckType.Cumulative) {
                ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(chunkMsgIds.length);
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    if (cMsgId == null || chunkMsgIds.length <= 1) continue;
                    entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                }
                ByteBuf cmd = Commands.newMultiMessageAck(this.consumer.consumerId, entriesToAck);
                if (flush) {
                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                } else {
                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
                }
            } else {
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    ByteBuf cmd = Commands.newAck(consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), lastCumulativeAckSet, ackType, validationError, map);
                    if (flush) {
                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                        continue;
                    }
                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
                }
            }
            this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
        } else {
            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet, ackType, validationError, map);
            if (flush) {
                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
            } else {
                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
            }
        }
    }
}

