/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentAcknowledgmentsGroupingTracker
implements AcknowledgmentsGroupingTracker {
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl)MessageId.earliest;
    private volatile boolean cumulativeAckFulshRequired = false;
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck");
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
    private final ScheduledFuture<?> scheduledTask;

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, EventLoopGroup eventLoopGroup) {
        this.consumer = consumer;
        this.pendingIndividualAcks = new ConcurrentSkipListSet();
        this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
        this.scheduledTask = this.acknowledgementGroupTimeMicros > 0L ? eventLoopGroup.next().scheduleWithFixedDelay(this::flush, this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS) : null;
    }

    @Override
    public boolean isDuplicate(MessageId messageId) {
        if (messageId.compareTo(this.lastCumulativeAck) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains(messageId);
    }

    @Override
    public void addAcknowledgment(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || !properties.isEmpty()) {
            this.doImmediateAck(msgId, ackType, properties);
        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            this.doCumulativeAck(msgId);
        } else {
            this.pendingIndividualAcks.add(msgId);
            if (this.pendingIndividualAcks.size() >= 1000) {
                this.flush();
            }
        }
    }

    private void doCumulativeAck(MessageIdImpl msgId) {
        MessageIdImpl lastCumlativeAck;
        while (msgId.compareTo(lastCumlativeAck = this.lastCumulativeAck) > 0) {
            if (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId)) continue;
            this.cumulativeAckFulshRequired = true;
            return;
        }
    }

    private boolean doImmediateAck(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return false;
        }
        ByteBuf cmd = Commands.newAck(this.consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), ackType, null, properties);
        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        return true;
    }

    @Override
    public void flush() {
        ClientCnx cnx;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks});
        }
        if ((cnx = this.consumer.getClientCnx()) == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
            return;
        }
        if (this.cumulativeAckFulshRequired) {
            ByteBuf cmd = Commands.newAck(this.consumer.consumerId, this.lastCumulativeAck.ledgerId, this.lastCumulativeAck.entryId, PulsarApi.CommandAck.AckType.Cumulative, null, Collections.emptyMap());
            cnx.ctx().write(cmd, cnx.ctx().voidPromise());
            this.cumulativeAckFulshRequired = false;
        }
        if (!this.pendingIndividualAcks.isEmpty()) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                MessageIdImpl msgId;
                ArrayList<Pair<Long, Long>> entriesToAck = new ArrayList<Pair<Long, Long>>(this.pendingIndividualAcks.size());
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    entriesToAck.add(Pair.of(msgId.getLedgerId(), msgId.getEntryId()));
                }
                cnx.ctx().write(Commands.newMultiMessageAck(this.consumer.consumerId, entriesToAck), cnx.ctx().voidPromise());
            } else {
                MessageIdImpl msgId;
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    cnx.ctx().write(Commands.newAck(this.consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
                }
            }
        }
        cnx.ctx().flush();
    }

    @Override
    public void close() {
        this.flush();
        if (this.scheduledTask != null) {
            this.scheduledTask.cancel(true);
        }
    }
}

