/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.PositionAckSetUtil;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingAckHandleImpl
implements PendingAckHandle {
    private static final Logger log = LoggerFactory.getLogger(PendingAckHandleImpl.class);
    private Map<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction;
    private Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
    private Pair<TxnID, PositionImpl> cumulativeAckOfTransaction;
    private final String topicName;
    private final String subName;
    private final PersistentSubscription persistentSubscription;

    public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
        this.topicName = persistentSubscription.getTopicName();
        this.subName = persistentSubscription.getName();
        this.persistentSubscription = persistentSubscription;
    }

    @Override
    public synchronized CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
        if (txnID == null) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("TransactionID can not be null."));
        }
        if (positions == null) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Positions can not be null."));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        for (MutablePair<PositionImpl, Integer> positionIntegerMutablePair : positions) {
            String errorMsg;
            PositionImpl position = (PositionImpl)positionIntegerMutablePair.left;
            if (((ManagedCursorImpl)this.persistentSubscription.getCursor()).isMessageDeleted((Position)position)) {
                errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " already acked before.";
                log.error(errorMsg);
                return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg));
            }
            if (position.hasAckSet()) {
                BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])position.getAckSet());
                if ((Integer)positionIntegerMutablePair.right > bitSetRecyclable.size()) {
                    bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue());
                }
                bitSetRecyclable.set(((Integer)positionIntegerMutablePair.right).intValue(), bitSetRecyclable.size());
                long[] ackSetOverlap = bitSetRecyclable.toLongArray();
                bitSetRecyclable.recycle();
                if (PositionAckSetUtil.isAckSetOverlap((long[])ackSetOverlap, (long[])((ManagedCursorImpl)this.persistentSubscription.getCursor()).getBatchPositionAckSet((Position)position))) {
                    String errorMsg2 = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " already acked before.";
                    log.error(errorMsg2);
                    return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg2));
                }
                if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position) || !PositionAckSetUtil.isAckSetOverlap((long[])((PositionImpl)this.individualAckPositions.get(position).getLeft()).getAckSet(), (long[])ackSetOverlap)) continue;
                String errorMsg3 = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack batch message:" + position + " in pending ack status.";
                log.error(errorMsg3);
                return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg3));
            }
            if (this.individualAckPositions == null || !this.individualAckPositions.containsKey(position)) continue;
            errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to ack message:" + position + " in pending ack status.";
            log.error(errorMsg);
            return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg));
        }
        for (int i = 0; i < positions.size(); ++i) {
            HashMap pendingAckMessageForCurrentTxn;
            PositionImpl position;
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] TxnID:[{}] Individual acks on {}", new Object[]{this.topicName, this.subName, txnID.toString(), positions});
            }
            if (this.individualAckOfTransaction == null) {
                this.individualAckOfTransaction = new HashMap<TxnID, HashMap<PositionImpl, PositionImpl>>();
            }
            if (this.individualAckPositions == null) {
                this.individualAckPositions = new HashMap<PositionImpl, MutablePair<PositionImpl, Integer>>();
            }
            if ((position = (PositionImpl)positions.get((int)i).left).hasAckSet()) {
                pendingAckMessageForCurrentTxn = this.individualAckOfTransaction.computeIfAbsent(txnID, txn -> new HashMap());
                if (pendingAckMessageForCurrentTxn.containsKey(position)) {
                    PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)pendingAckMessageForCurrentTxn.get(position)), (PositionImpl)position);
                } else {
                    pendingAckMessageForCurrentTxn.put(position, position);
                }
                if (!this.individualAckPositions.containsKey(position)) {
                    this.individualAckPositions.put(position, positions.get(i));
                    continue;
                }
                MutablePair<PositionImpl, Integer> positionPair = this.individualAckPositions.get(position);
                positionPair.setRight(positions.get((int)i).right);
                PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)positionPair.getLeft()), (PositionImpl)position);
                continue;
            }
            pendingAckMessageForCurrentTxn = this.individualAckOfTransaction.computeIfAbsent(txnID, txn -> new HashMap());
            pendingAckMessageForCurrentTxn.put(position, position);
            this.individualAckPositions.putIfAbsent(position, positions.get(i));
        }
        completableFuture.complete(null);
        return completableFuture;
    }

    @Override
    public synchronized CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) {
        if (txnID == null) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("TransactionID can not be null."));
        }
        if (positions == null) {
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException("Positions can not be null."));
        }
        if (positions.size() != 1) {
            String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " invalid cumulative ack received with multiple message ids.";
            log.error(errorMsg);
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException(errorMsg));
        }
        PositionImpl position = positions.get(0);
        if (position.compareTo((PositionImpl)this.persistentSubscription.getCursor().getMarkDeletedPosition()) <= 0) {
            String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to cumulative ack position: " + position + " within range of cursor's markDeletePosition: " + this.persistentSubscription.getCursor().getMarkDeletedPosition();
            log.error(errorMsg);
            return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg));
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] TxnID:[{}] Cumulative ack on {}.", new Object[]{this.topicName, this.subName, txnID.toString(), position});
        }
        if (this.cumulativeAckOfTransaction == null) {
            this.cumulativeAckOfTransaction = MutablePair.of((Object)txnID, (Object)position);
        } else if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID) && PositionAckSetUtil.compareToWithAckSet((PositionImpl)position, (PositionImpl)((PositionImpl)this.cumulativeAckOfTransaction.getValue())) > 0) {
            this.cumulativeAckOfTransaction.setValue((Object)position);
        } else {
            String errorMsg = "[" + this.topicName + "][" + this.subName + "] Transaction:" + txnID + " try to cumulative batch ack position: " + position + " within range of current currentPosition: " + this.cumulativeAckOfTransaction.getValue();
            log.error(errorMsg);
            return FutureUtil.failedFuture((Throwable)new TransactionConflictException(errorMsg));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties) {
        HashMap<PositionImpl, PositionImpl> pendingAckMessageForCurrentTxn;
        CompletableFuture<Void> commitFuture = new CompletableFuture<Void>();
        if (this.cumulativeAckOfTransaction != null) {
            if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnID)) {
                this.persistentSubscription.acknowledgeMessage(Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), PulsarApi.CommandAck.AckType.Cumulative, properties);
                this.cumulativeAckOfTransaction = null;
            }
        } else if (this.individualAckOfTransaction != null && this.individualAckOfTransaction.containsKey(txnID) && (pendingAckMessageForCurrentTxn = this.individualAckOfTransaction.remove(txnID)) != null) {
            this.persistentSubscription.acknowledgeMessage(new ArrayList<PositionImpl>(pendingAckMessageForCurrentTxn.values()), PulsarApi.CommandAck.AckType.Individual, properties);
        }
        commitFuture.complete(null);
        return commitFuture;
    }

    @Override
    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer) {
        HashMap<PositionImpl, PositionImpl> pendingAckMessageForCurrentTxn;
        CompletableFuture<Void> abortFuture = new CompletableFuture<Void>();
        if (this.cumulativeAckOfTransaction != null) {
            if (((TxnID)this.cumulativeAckOfTransaction.getKey()).equals((Object)txnId)) {
                this.cumulativeAckOfTransaction = null;
            }
            this.persistentSubscription.redeliverUnacknowledgedMessages(consumer);
        } else if (this.individualAckOfTransaction != null && (pendingAckMessageForCurrentTxn = this.individualAckOfTransaction.remove(txnId)) != null) {
            for (Map.Entry<PositionImpl, PositionImpl> entry : pendingAckMessageForCurrentTxn.entrySet()) {
                if (entry.getValue().hasAckSet() && this.individualAckPositions.containsKey(entry.getValue())) {
                    BitSetRecyclable thisBitSet = BitSetRecyclable.valueOf((long[])entry.getValue().getAckSet());
                    thisBitSet.flip(0, ((Integer)this.individualAckPositions.get((Object)entry.getValue()).right).intValue());
                    BitSetRecyclable otherBitSet = BitSetRecyclable.valueOf((long[])((PositionImpl)this.individualAckPositions.get((Object)entry.getValue()).left).getAckSet());
                    otherBitSet.or(thisBitSet);
                    ((PositionImpl)this.individualAckPositions.get((Object)entry.getKey()).left).setAckSet(otherBitSet.toLongArray());
                    otherBitSet.recycle();
                    thisBitSet.recycle();
                    continue;
                }
                this.individualAckPositions.remove(entry.getValue());
            }
            this.persistentSubscription.redeliverUnacknowledgedMessages(consumer, new ArrayList<PositionImpl>(pendingAckMessageForCurrentTxn.values()));
        }
        abortFuture.complete(null);
        return abortFuture;
    }

    @Override
    public synchronized void syncBatchPositionAckSetForTransaction(PositionImpl position) {
        if (this.individualAckPositions == null) {
            this.individualAckPositions = new HashMap<PositionImpl, MutablePair<PositionImpl, Integer>>();
        }
        if (!this.individualAckPositions.containsKey(position)) {
            this.individualAckPositions.put(position, (MutablePair<PositionImpl, Integer>)new MutablePair((Object)position, (Object)0));
        } else {
            PositionAckSetUtil.andAckSet((PositionImpl)((PositionImpl)this.individualAckPositions.get((Object)position).left), (PositionImpl)position);
        }
    }

    @Override
    public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
        if (!this.individualAckPositions.containsKey(position)) {
            return true;
        }
        position = (PositionImpl)this.individualAckPositions.get((Object)position).left;
        if (position.hasAckSet()) {
            BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf((long[])position.getAckSet());
            if (bitSetRecyclable.isEmpty()) {
                bitSetRecyclable.recycle();
                return true;
            }
            bitSetRecyclable.recycle();
            return false;
        }
        return true;
    }

    @Override
    public void clearIndividualPosition(Position position) {
        if (this.individualAckPositions == null) {
            return;
        }
        if (position instanceof PositionImpl) {
            this.individualAckPositions.remove(position);
        }
    }
}

