/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.PositionAckSetUtil;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleState;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingAckHandleImpl
extends PendingAckHandleState
implements PendingAckHandle {
    private static final Logger log = LoggerFactory.getLogger(PendingAckHandleImpl.class);
    private LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction;
    private ConcurrentSkipListMap<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
    private Pair<TxnID, PositionImpl> cumulativeAckOfTransaction;
    private final String topicName;
    private final String subName;
    private final PersistentSubscription persistentSubscription;
    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
    private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture();
    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<Runnable>();
    private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap();
    private final Semaphore handleLowWaterMark = new Semaphore(1);
    private final ExecutorService internalPinnedExecutor;
    private final PendingAckHandleStats handleStats;
    public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();

    public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
        super(PendingAckHandleState.State.None);
        this.topicName = persistentSubscription.getTopicName();
        this.subName = persistentSubscription.getName();
        this.persistentSubscription = persistentSubscription;
        this.internalPinnedExecutor = persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor((Object)this);
        ServiceConfiguration config = persistentSubscription.getTopic().getBrokerService().pulsar().getConfig();
        boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
        this.handleStats = PendingAckHandleStats.create(this.topicName, this.subName, exposeTopicLevelMetrics);
        this.pendingAckStoreProvider = this.persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
        ((CompletableFuture)this.pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAcceptAsync(init -> {
            if (init.booleanValue()) {
                this.initPendingAckStore();
            } else {
                this.completeHandleFuture();
            }
        }, (Executor)this.internalPinnedExecutor)).exceptionallyAsync(e -> {
            Throwable t = FutureUtil.unwrapCompletionException((Throwable)e);
            this.changeToErrorState();
            this.exceptionHandleFuture(t);
            this.pendingAckStoreFuture.completeExceptionally(t);
            return null;
        }, (Executor)this.internalPinnedExecutor);
    }

    private void initPendingAckStore() {
        if (this.changeToInitializingState() && !this.checkIfClose()) {
            this.pendingAckStoreFuture = this.pendingAckStoreProvider.newPendingAckStore(this.persistentSubscription);
            ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
                this.recoverTime.setRecoverStartTime(System.currentTimeMillis());
                pendingAckStore.replayAsync(this, this.internalPinnedExecutor);
            })).exceptionallyAsync(e -> {
                this.handleCacheRequest();
                this.changeToErrorState();
                log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", new Object[]{this.topicName, this.subName, e});
                this.exceptionHandleFuture(e.getCause());
                return null;
            }, (Executor)this.internalPinnedExecutor);
        }
    }

    private void addIndividualAcknowledgeMessageRequest(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, CompletableFuture<Void> completableFuture) {
        this.acceptQueue.add(() -> this.internalIndividualAcknowledgeMessage(txnID, positions, completableFuture));
    }

    public void internalIndividualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, CompletableFuture<Void> completableFuture) {
        if (txnID == null) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("txnID can not be null."));
            return;
        }
        if (positions == null) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Positions can not be null."));
            return;
        }
        ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendIndividualAck(txnID, positions).thenAccept(v -> {
            PendingAckHandleImpl pendingAckHandleImpl = this;
            synchronized (pendingAckHandleImpl) {
                for (MutablePair positionIntegerMutablePair : positions) {
                    String errorMsg;
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] individualAcknowledgeMessage position: [{}], txnId: [{}], subName: [{}]", new Object[]{this.topicName, positionIntegerMutablePair.left, txnID, this.subName});
                    }
                    PositionImpl position = (PositionImpl)positionIntegerMutablePair.left;
                    if (((ManagedCursorImpl)this.persistentSubscription.getCursor()).isMessageDeleted((Position)position)) {
                        errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " already acked before.";
                        log.error(errorMsg);
                        completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg));
                        return;
                    }
                    if (position.hasAckSet()) {
                        BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])position.getAckSet());
                        if ((Integer)positionIntegerMutablePair.right > bitSetRecyclable.size()) {
                            bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue());
                        }
                        bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue(), bitSetRecyclable.size());
                        long[] ackSetOverlap = bitSetRecyclable.toLongArray();
                        bitSetRecyclable.recycle();
                        if (PositionAckSetUtil.isAckSetOverlap((long[])ackSetOverlap, (long[])((ManagedCursorImpl)this.persistentSubscription.getCursor()).getBatchPositionAckSet((Position)position))) {
                            String errorMsg2 = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " already acked before.";
                            log.error(errorMsg2);
                            completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg2));
                            return;
                        }
                        if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position) || !PositionAckSetUtil.isAckSetOverlap((long[])((PositionImpl)this.individualAckPositions.get(position).getLeft()).getAckSet(), (long[])ackSetOverlap)) continue;
                        String errorMsg3 = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack batch message:" + position + " in pending ack status.";
                        log.error(errorMsg3);
                        completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg3));
                        return;
                    }
                    if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position)) continue;
                    errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " in pending ack status.";
                    log.error(errorMsg);
                    completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg));
                    return;
                }
                this.handleIndividualAck(txnID, positions);
                completableFuture.complete(null);
            }
        })).exceptionally(e -> {
            PendingAckHandleImpl pendingAckHandleImpl = this;
            synchronized (pendingAckHandleImpl) {
                this.handleIndividualAck(txnID, positions);
                completableFuture.completeExceptionally(e.getCause());
            }
            return null;
        }))).exceptionally(e -> {
            completableFuture.completeExceptionally((Throwable)e);
            return null;
        });
    }

    @Override
    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                switch (this.state) {
                    case Initializing: {
                        this.addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
                        return;
                    }
                    case None: {
                        this.addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
                        this.initPendingAckStore();
                        return;
                    }
                    case Error: {
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
                        return;
                    }
                    case Close: {
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
                        return;
                    }
                }
            }
            this.internalIndividualAcknowledgeMessage(txnID, positions, completableFuture);
        });
        return completableFuture;
    }

    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, List<PositionImpl> positions, CompletableFuture<Void> completableFuture) {
        this.acceptQueue.add(() -> this.internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture));
    }

    public void internalCumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions, CompletableFuture<Void> completableFuture) {
        if (txnID == null) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("TransactionID can not be null."));
            return;
        }
        if (positions == null) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Positions can not be null."));
            return;
        }
        if (positions.size() != 1) {
            String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " invalid cumulative ack received with multiple message ids.";
            log.error(errorMsg);
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException(errorMsg));
            return;
        }
        PositionImpl position = positions.get(0);
        ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendCumulativeAck(txnID, position).thenAccept(v -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] cumulativeAcknowledgeMessage position: [{}], txnID:[{}], subName: [{}].", new Object[]{this.topicName, txnID, position, this.subName});
            }
            if (position.compareTo((PositionImpl)this.persistentSubscription.getCursor().getMarkDeletedPosition()) <= 0) {
                String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to cumulative ack position: " + position + " within range of cursor's markDeletePosition: " + this.persistentSubscription.getCursor().getMarkDeletedPosition();
                log.error(errorMsg);
                completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg));
                return;
            }
            if (!(this.cumulativeAckOfTransaction == null || ((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID) && PositionAckSetUtil.compareToWithAckSet((PositionImpl)position, (PositionImpl)((PositionImpl)this.cumulativeAckOfTransaction.getValue())) > 0)) {
                String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to cumulative batch ack position: " + position + " within range of current currentPosition: " + this.cumulativeAckOfTransaction.getValue();
                log.error(errorMsg);
                completableFuture.completeExceptionally((Throwable)new TransactionConflictException(errorMsg));
                return;
            }
            this.handleCumulativeAck(txnID, position);
            completableFuture.complete(null);
        })).exceptionally(e -> {
            this.handleCumulativeAck(txnID, position);
            completableFuture.completeExceptionally(e.getCause());
            return null;
        }))).exceptionally(e -> {
            completableFuture.completeExceptionally((Throwable)e);
            return null;
        });
    }

    @Override
    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                switch (this.state) {
                    case Initializing: {
                        this.addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture);
                        return;
                    }
                    case None: {
                        this.addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture);
                        this.initPendingAckStore();
                        return;
                    }
                    case Error: {
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
                        return;
                    }
                    case Close: {
                        completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
                        return;
                    }
                }
            }
            this.internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture);
        });
        return completableFuture;
    }

    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark, CompletableFuture<Void> completableFuture) {
        this.acceptQueue.add(() -> this.internalCommitTxn(txnId, properties, lowWaterMark, completableFuture));
    }

    private void internalCommitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark, CompletableFuture<Void> commitFuture) {
        if (this.cumulativeAckOfTransaction != null) {
            if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID)) {
                ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendCommitMark(txnID, CommandAck.AckType.Cumulative).thenAccept(v -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Transaction pending ack store commit txnId : [{}] success! subName: [{}]", new Object[]{this.topicName, txnID, this.subName});
                    }
                    this.persistentSubscription.acknowledgeMessage(Collections.singletonList((Position)this.cumulativeAckOfTransaction.getValue()), CommandAck.AckType.Cumulative, properties);
                    this.cumulativeAckOfTransaction = null;
                    commitFuture.complete(null);
                })).exceptionally(e -> {
                    log.error("[{}] Transaction pending ack store commit txnId : [{}] fail!", new Object[]{this.topicName, txnID, e});
                    commitFuture.completeExceptionally((Throwable)e);
                    return null;
                }))).exceptionally(e -> {
                    commitFuture.completeExceptionally((Throwable)e);
                    return null;
                });
            } else {
                commitFuture.complete(null);
            }
        } else {
            ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendCommitMark(txnID, CommandAck.AckType.Individual).thenAccept(v -> {
                PendingAckHandleImpl pendingAckHandleImpl = this;
                synchronized (pendingAckHandleImpl) {
                    if (this.individualAckOfTransaction != null && this.individualAckOfTransaction.containsKey((Object)txnID)) {
                        HashMap pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.get((Object)txnID);
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Transaction pending ack store commit txnId : [{}] success! subName: [{}]", new Object[]{this.topicName, txnID, this.subName});
                        }
                        this.individualAckCommitCommon(txnID, pendingAckMessageForCurrentTxn, properties);
                        commitFuture.complete(null);
                        this.handleLowWaterMark(txnID, lowWaterMark);
                    } else {
                        commitFuture.complete(null);
                    }
                }
            })).exceptionally(e -> {
                log.error("[{}] Transaction pending ack store commit txnId : [{}] fail!", new Object[]{this.topicName, txnID, e});
                commitFuture.completeExceptionally(e.getCause());
                return null;
            }))).exceptionally(e -> {
                commitFuture.completeExceptionally((Throwable)e);
                return null;
            });
        }
    }

    @Override
    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) {
        long start = System.nanoTime();
        CompletableFuture commitFuture = new CompletableFuture();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                switch (this.state) {
                    case Initializing: {
                        this.addCommitTxnRequest(txnID, properties, lowWaterMark, commitFuture);
                        return;
                    }
                    case None: {
                        this.addCommitTxnRequest(txnID, properties, lowWaterMark, commitFuture);
                        this.initPendingAckStore();
                        return;
                    }
                    case Error: {
                        if (this.state == PendingAckHandleState.State.Error) {
                            commitFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
                        } else {
                            commitFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
                        }
                        return;
                    }
                }
            }
            this.internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
        });
        return commitFuture.whenComplete((__, t) -> this.handleStats.recordCommitTxn(t == null, System.nanoTime() - start));
    }

    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark, CompletableFuture<Void> completableFuture) {
        this.acceptQueue.add(() -> this.internalAbortTxn(txnId, consumer, lowWaterMark, completableFuture));
    }

    public CompletableFuture<Void> internalAbortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, CompletableFuture<Void> abortFuture) {
        if (this.cumulativeAckOfTransaction != null) {
            ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendAbortMark(txnId, CommandAck.AckType.Cumulative).thenAccept(v -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Transaction pending ack store abort txnId : [{}] success! subName: [{}]", new Object[]{this.topicName, txnId, this.subName});
                }
                if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnId)) {
                    this.cumulativeAckOfTransaction = null;
                }
                abortFuture.complete(null);
            })).exceptionally(e -> {
                log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!", new Object[]{this.topicName, txnId, e});
                abortFuture.completeExceptionally((Throwable)e);
                return null;
            }))).exceptionally(e -> {
                abortFuture.completeExceptionally((Throwable)e);
                return null;
            });
        } else if (this.individualAckOfTransaction != null) {
            ((CompletableFuture)this.pendingAckStoreFuture.thenAccept(pendingAckStore -> ((CompletableFuture)pendingAckStore.appendAbortMark(txnId, CommandAck.AckType.Individual).thenAccept(v -> {
                PendingAckHandleImpl pendingAckHandleImpl = this;
                synchronized (pendingAckHandleImpl) {
                    HashMap pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.get((Object)txnId);
                    if (pendingAckMessageForCurrentTxn != null) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Transaction pending ack store abort txnId : [{}] success! subName: [{}]", new Object[]{this.topicName, txnId, this.subName});
                        }
                        this.individualAckAbortCommon(txnId, pendingAckMessageForCurrentTxn);
                        this.persistentSubscription.redeliverUnacknowledgedMessages(consumer, new ArrayList<PositionImpl>(pendingAckMessageForCurrentTxn.values()));
                        abortFuture.complete(null);
                        this.handleLowWaterMark(txnId, lowWaterMark);
                    } else {
                        abortFuture.complete(null);
                    }
                }
            })).exceptionally(e -> {
                log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!", new Object[]{this.topicName, txnId, e});
                abortFuture.completeExceptionally((Throwable)e);
                return null;
            }))).exceptionally(e -> {
                log.error("[{}] abortTxn", (Object)txnId, e);
                abortFuture.completeExceptionally((Throwable)e);
                return null;
            });
        } else {
            abortFuture.complete(null);
        }
        return abortFuture.whenComplete((__, t) -> this.handleStats.recordAbortTxn(t == null));
    }

    @Override
    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) {
        CompletableFuture<Void> abortFuture = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                switch (this.state) {
                    case Initializing: {
                        this.addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture);
                        return;
                    }
                    case None: {
                        this.addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture);
                        this.initPendingAckStore();
                        return;
                    }
                }
                if (this.state == PendingAckHandleState.State.Error) {
                    abortFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
                } else {
                    abortFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
                }
                return;
            }
            this.internalAbortTxn(txnId, consumer, lowWaterMark, abortFuture);
        });
        return abortFuture;
    }

    private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
        this.lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
                return lowWaterMark;
            }
            return oldLowWaterMark;
        });
        if (this.handleLowWaterMark.tryAcquire()) {
            TxnID firstTxn;
            long tCId;
            Long lowWaterMarkOfFirstTxnId;
            if (this.individualAckOfTransaction != null && !this.individualAckOfTransaction.isEmpty() && (lowWaterMarkOfFirstTxnId = this.lowWaterMarks.get(tCId = (firstTxn = (TxnID)this.individualAckOfTransaction.firstKey()).getMostSigBits())) != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
                ((CompletableFuture)this.abortTxn(firstTxn, null, lowWaterMarkOfFirstTxnId).thenRun(() -> {
                    log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], lowWaterMark : [{}]", new Object[]{this.topicName, firstTxn, lowWaterMarkOfFirstTxnId});
                    this.handleLowWaterMark.release();
                })).exceptionally(ex -> {
                    log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], lowWaterMark : [{}]", new Object[]{this.topicName, firstTxn, lowWaterMarkOfFirstTxnId});
                    this.handleLowWaterMark.release();
                    return null;
                });
                return;
            }
            this.handleLowWaterMark.release();
        }
    }

    @Override
    public synchronized void syncBatchPositionAckSetForTransaction(PositionImpl position) {
        if (this.individualAckPositions == null) {
            this.individualAckPositions = new ConcurrentSkipListMap();
        }
        if (!this.individualAckPositions.containsKey(position)) {
            this.individualAckPositions.put(position, (MutablePair<PositionImpl, Integer>)new MutablePair((Object)position, (Object)0));
        } else {
            PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)this.individualAckPositions.get((Object)position).left), (PositionImpl)position);
        }
    }

    @Override
    public synchronized boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
        if (!this.individualAckPositions.containsKey(position)) {
            return true;
        }
        position = (PositionImpl)this.individualAckPositions.get((Object)position).left;
        if (position.hasAckSet()) {
            BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])position.getAckSet());
            if (bitSetRecyclable.isEmpty()) {
                bitSetRecyclable.recycle();
                return true;
            }
            bitSetRecyclable.recycle();
            return false;
        }
        return true;
    }

    protected void handleAbort(TxnID txnID, CommandAck.AckType ackType) {
        HashMap pendingAckMessageForCurrentTxn;
        if (ackType == CommandAck.AckType.Cumulative) {
            this.cumulativeAckOfTransaction = null;
        } else if (this.individualAckOfTransaction != null && (pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.get((Object)txnID)) != null) {
            this.individualAckAbortCommon(txnID, pendingAckMessageForCurrentTxn);
        }
    }

    private void individualAckAbortCommon(TxnID txnID, HashMap<PositionImpl, PositionImpl> currentTxn) {
        for (Map.Entry<PositionImpl, PositionImpl> entry : currentTxn.entrySet()) {
            if (entry.getValue().hasAckSet() && this.individualAckPositions.containsKey(entry.getValue())) {
                BitSetRecyclable thisBitSet = BitSetRecyclable.valueOf((long[])entry.getValue().getAckSet());
                int batchSize = (Integer)this.individualAckPositions.get((Object)entry.getValue()).right;
                thisBitSet.flip(0, batchSize);
                BitSetRecyclable otherBitSet = BitSetRecyclable.valueOf((long[])((PositionImpl)this.individualAckPositions.get((Object)entry.getValue()).left).getAckSet());
                otherBitSet.or(thisBitSet);
                if (otherBitSet.cardinality() == batchSize) {
                    this.individualAckPositions.remove(entry.getValue());
                } else {
                    ((PositionImpl)this.individualAckPositions.get((Object)entry.getKey()).left).setAckSet(otherBitSet.toLongArray());
                }
                otherBitSet.recycle();
                thisBitSet.recycle();
                continue;
            }
            this.individualAckPositions.remove(entry.getValue());
        }
        this.individualAckOfTransaction.remove((Object)txnID);
    }

    protected void handleCommit(TxnID txnID, CommandAck.AckType ackType, Map<String, Long> properties) {
        HashMap pendingAckMessageForCurrentTxn;
        if (ackType == CommandAck.AckType.Cumulative) {
            if (this.cumulativeAckOfTransaction != null) {
                this.persistentSubscription.acknowledgeMessage(Collections.singletonList((Position)this.cumulativeAckOfTransaction.getValue()), CommandAck.AckType.Cumulative, properties);
            }
            this.cumulativeAckOfTransaction = null;
        } else if (this.individualAckOfTransaction != null && (pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.get((Object)txnID)) != null) {
            this.individualAckCommitCommon(txnID, pendingAckMessageForCurrentTxn, null);
        }
    }

    private void individualAckCommitCommon(TxnID txnID, HashMap<PositionImpl, PositionImpl> currentTxn, Map<String, Long> properties) {
        if (currentTxn != null) {
            this.persistentSubscription.acknowledgeMessage(new ArrayList<PositionImpl>(currentTxn.values()), CommandAck.AckType.Individual, properties);
            this.individualAckOfTransaction.remove((Object)txnID);
        }
    }

    private void handleIndividualAck(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
        for (int i = 0; i < positions.size(); ++i) {
            HashMap pendingAckMessageForCurrentTxn;
            PositionImpl position;
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] TxnID:[{}] Individual acks on {}", new Object[]{this.topicName, this.subName, txnID.toString(), positions});
            }
            if (this.individualAckOfTransaction == null) {
                this.individualAckOfTransaction = new LinkedMap();
            }
            if (this.individualAckPositions == null) {
                this.individualAckPositions = new ConcurrentSkipListMap();
            }
            if ((position = (PositionImpl)positions.get((int)i).left).hasAckSet()) {
                MutablePair<PositionImpl, Integer> positionPair;
                pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.computeIfAbsent((Object)txnID, txn -> new HashMap());
                if (pendingAckMessageForCurrentTxn.containsKey(position)) {
                    PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)pendingAckMessageForCurrentTxn.get(position)), (PositionImpl)position);
                } else {
                    pendingAckMessageForCurrentTxn.put(position, position);
                }
                if (!this.individualAckPositions.containsKey(position)) {
                    positionPair = positions.get(i);
                    positionPair.left = PositionImpl.get((long)((PositionImpl)positionPair.getLeft()).getLedgerId(), (long)((PositionImpl)positionPair.getLeft()).getEntryId(), (long[])Arrays.copyOf(((PositionImpl)positionPair.left).getAckSet(), ((PositionImpl)positionPair.left).getAckSet().length));
                    this.individualAckPositions.put(position, positions.get(i));
                    continue;
                }
                positionPair = this.individualAckPositions.get(position);
                positionPair.setRight((Object)((Integer)positions.get((int)i).right));
                PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)positionPair.getLeft()), (PositionImpl)position);
                continue;
            }
            pendingAckMessageForCurrentTxn = (HashMap)this.individualAckOfTransaction.computeIfAbsent((Object)txnID, txn -> new HashMap());
            pendingAckMessageForCurrentTxn.put(position, position);
            this.individualAckPositions.putIfAbsent(position, positions.get(i));
        }
    }

    private void handleCumulativeAck(TxnID txnID, PositionImpl position) {
        if (this.cumulativeAckOfTransaction == null) {
            this.cumulativeAckOfTransaction = MutablePair.of((Object)txnID, (Object)position);
        } else if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID) && PositionAckSetUtil.compareToWithAckSet((PositionImpl)position, (PositionImpl)((PositionImpl)this.cumulativeAckOfTransaction.getValue())) > 0) {
            this.cumulativeAckOfTransaction.setValue((Object)position);
        }
    }

    protected void handleCumulativeAckRecover(TxnID txnID, PositionImpl position) {
        if (position.compareTo((PositionImpl)this.persistentSubscription.getCursor().getMarkDeletedPosition()) > 0 && (this.cumulativeAckOfTransaction == null || ((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID) && PositionAckSetUtil.compareToWithAckSet((PositionImpl)position, (PositionImpl)((PositionImpl)this.cumulativeAckOfTransaction.getValue())) > 0)) {
            this.handleCumulativeAck(txnID, position);
        }
    }

    protected void handleIndividualAckRecover(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
        for (MutablePair<PositionImpl, Integer> positionIntegerMutablePair : positions) {
            PositionImpl position = (PositionImpl)positionIntegerMutablePair.left;
            if (((ManagedCursorImpl)this.persistentSubscription.getCursor()).isMessageDeleted((Position)position)) {
                return;
            }
            if (position.hasAckSet()) {
                BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])position.getAckSet());
                if ((Integer)positionIntegerMutablePair.right > bitSetRecyclable.size()) {
                    bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue());
                }
                bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue(), bitSetRecyclable.size());
                long[] ackSetOverlap = bitSetRecyclable.toLongArray();
                bitSetRecyclable.recycle();
                if (PositionAckSetUtil.isAckSetOverlap((long[])ackSetOverlap, (long[])((ManagedCursorImpl)this.persistentSubscription.getCursor()).getBatchPositionAckSet((Position)position))) {
                    return;
                }
                if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position) || !PositionAckSetUtil.isAckSetOverlap((long[])((PositionImpl)this.individualAckPositions.get(position).getLeft()).getAckSet(), (long[])ackSetOverlap)) continue;
                return;
            }
            if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position)) continue;
            return;
        }
        this.handleIndividualAck(txnID, positions);
    }

    public String getTopicName() {
        return this.topicName;
    }

    public String getSubName() {
        return this.subName;
    }

    @Override
    public synchronized void clearIndividualPosition(Position position) {
        if (this.individualAckPositions == null) {
            return;
        }
        if (position instanceof PositionImpl) {
            this.individualAckPositions.remove(position);
        }
        while (this.individualAckPositions.firstEntry() != null && this.individualAckPositions.firstKey().compareTo((PositionImpl)this.persistentSubscription.getCursor().getMarkDeletedPosition()) < 0) {
            this.individualAckPositions.remove(this.individualAckPositions.firstKey());
        }
    }

    @Override
    public CompletableFuture<PendingAckHandle> pendingAckHandleFuture() {
        return this.pendingAckHandleCompletableFuture;
    }

    @Override
    public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
        TransactionPendingAckStats transactionPendingAckStats = new TransactionPendingAckStats();
        transactionPendingAckStats.state = this.getState().name();
        if (lowWaterMarks) {
            transactionPendingAckStats.lowWaterMarks = this.lowWaterMarks;
        }
        transactionPendingAckStats.ongoingTxnSize = this.individualAckOfTransaction != null ? (long)this.individualAckOfTransaction.size() : 0L;
        transactionPendingAckStats.recoverStartTime = this.recoverTime.getRecoverStartTime();
        transactionPendingAckStats.recoverEndTime = this.recoverTime.getRecoverEndTime();
        return transactionPendingAckStats;
    }

    public void completeHandleFuture() {
        this.pendingAckHandleCompletableFuture.complete(this);
        if (this.recoverTime.getRecoverStartTime() != 0L && this.recoverTime.getRecoverEndTime() == 0L) {
            this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
        }
    }

    public void exceptionHandleFuture(Throwable t) {
        boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
        if (completedNow) {
            this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
        }
    }

    @Override
    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
        TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
        if (this.cumulativeAckOfTransaction != null && ((TxnID)this.cumulativeAckOfTransaction.getLeft()).equals((Object)txnID)) {
            BitSetRecyclable bitSetRecyclable;
            PositionImpl position = (PositionImpl)this.cumulativeAckOfTransaction.getRight();
            StringBuilder stringBuilder = new StringBuilder().append(position.getLedgerId()).append(':').append(position.getEntryId());
            if (((PositionImpl)this.cumulativeAckOfTransaction.getRight()).hasAckSet() && !(bitSetRecyclable = BitSetRecyclable.valueOf((long[])((PositionImpl)this.cumulativeAckOfTransaction.getRight()).getAckSet())).isEmpty()) {
                stringBuilder.append(":").append(bitSetRecyclable.nextSetBit(0) - 1);
            }
            transactionInPendingAckStats.cumulativeAckPosition = stringBuilder.toString();
        }
        return transactionInPendingAckStats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        this.changeToCloseState();
        PendingAckHandleImpl pendingAckHandleImpl = this;
        synchronized (pendingAckHandleImpl) {
            if (this.pendingAckStoreFuture != null) {
                CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
                this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> {
                    if (e != null) {
                        closeFuture.complete(null);
                    } else {
                        pendingAckStore.closeAsync().whenComplete((q, ex) -> {
                            if (ex != null) {
                                Throwable t = FutureUtil.unwrapCompletionException((Throwable)ex);
                                closeFuture.completeExceptionally(t);
                            } else {
                                closeFuture.complete(null);
                            }
                        });
                    }
                });
                return closeFuture;
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    public CompletableFuture<ManagedLedger> getStoreManageLedger() {
        if (this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone()) {
            return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
                if (pendingAckStore instanceof MLPendingAckStore) {
                    return ((MLPendingAckStore)pendingAckStore).getManagedLedger();
                }
                return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Pending ack handle don't use managedLedger!"));
            });
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServiceUnitNotReadyException("Pending ack have not init success!"));
    }

    @Override
    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
        if (!this.state.equals((Object)PendingAckHandleState.State.Ready)) {
            return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAckNotReady);
        }
        if (this.persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null && position.compareTo((PositionImpl)this.persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
            return new PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
        }
        if (this.individualAckPositions == null) {
            return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
        }
        MutablePair<PositionImpl, Integer> positionIntegerMutablePair = this.individualAckPositions.get(position);
        if (positionIntegerMutablePair != null) {
            if (batchIndex == null) {
                return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
            }
            if (batchIndex >= (Integer)positionIntegerMutablePair.right) {
                return new PositionInPendingAckStats(PositionInPendingAckStats.State.InvalidPosition);
            }
            BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])((PositionImpl)positionIntegerMutablePair.left).getAckSet());
            if (bitSetRecyclable.get(batchIndex.intValue())) {
                bitSetRecyclable.recycle();
                return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
            }
            bitSetRecyclable.recycle();
            return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
        }
        return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
    }

    @VisibleForTesting
    public Map<PositionImpl, MutablePair<PositionImpl, Integer>> getIndividualAckPositions() {
        return this.individualAckPositions;
    }

    @Override
    public boolean checkIfPendingAckStoreInit() {
        return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
    }

    @Override
    public PositionImpl getPositionInPendingAck(PositionImpl position) {
        MutablePair<PositionImpl, Integer> positionPair;
        if (this.individualAckPositions != null && (positionPair = this.individualAckPositions.get(position)) != null) {
            return (PositionImpl)positionPair.getLeft();
        }
        return null;
    }

    protected void handleCacheRequest() {
        Runnable runnable;
        while ((runnable = (Runnable)this.acceptQueue.poll()) != null) {
            runnable.run();
        }
    }

    public ExecutorService getInternalPinnedExecutor() {
        return this.internalPinnedExecutor;
    }
}

