package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.class */
public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer {
    private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBuffer.class);
    private TransactionCursor txnCursor;
    private ManagedCursor retentionCursor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer$Marker.class */
    public static final class Marker {
        long sequenceId;
        ByteBuf marker;

        /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer$Marker$MarkerBuilder.class */
        public static class MarkerBuilder {
            private long sequenceId;
            private ByteBuf marker;

            MarkerBuilder() {
            }

            public MarkerBuilder sequenceId(long j) {
                this.sequenceId = j;
                return this;
            }

            public MarkerBuilder marker(ByteBuf byteBuf) {
                this.marker = byteBuf;
                return this;
            }

            public Marker build() {
                return new Marker(this.sequenceId, this.marker);
            }

            public String toString() {
                return "PersistentTransactionBuffer.Marker.MarkerBuilder(sequenceId=" + this.sequenceId + ", marker=" + this.marker + ")";
            }
        }

        Marker(long j, ByteBuf byteBuf) {
            this.sequenceId = j;
            this.marker = byteBuf;
        }

        public static MarkerBuilder builder() {
            return new MarkerBuilder();
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer$TxnCtx.class */
    static abstract class TxnCtx implements Topic.PublishContext {
        private final long sequenceId;
        private final CompletableFuture<Position> completableFuture;
        private final String producerName;

        TxnCtx(String str, long j, CompletableFuture<Position> completableFuture) {
            this.sequenceId = j;
            this.completableFuture = completableFuture;
            this.producerName = str;
        }

        @Override // org.apache.pulsar.broker.service.Topic.PublishContext
        public String getProducerName() {
            return this.producerName;
        }

        @Override // org.apache.pulsar.broker.service.Topic.PublishContext
        public long getSequenceId() {
            return this.sequenceId;
        }
    }

    public PersistentTransactionBuffer(String str, ManagedLedger managedLedger, BrokerService brokerService) throws BrokerServiceException.NamingException, ManagedLedgerException {
        super(str, managedLedger, brokerService);
        this.txnCursor = new TransactionCursorImpl();
        this.retentionCursor = managedLedger.newNonDurableCursor(PositionImpl.earliest, "txn-buffer-retention");
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        return this.txnCursor.getTxnMeta(txnID, false);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> appendBufferToTxn(TxnID txnID, long j, ByteBuf byteBuf) {
        return publishMessage(txnID, byteBuf, j).thenCompose(position -> {
            return appendBuffer(txnID, position, j);
        });
    }

    private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long j) {
        return this.txnCursor.getTxnMeta(txnID, true).thenCompose(transactionMeta -> {
            return transactionMeta.appendEntry(j, position);
        });
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long j) {
        return this.txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader);
    }

    private CompletableFuture<TransactionBufferReader> createNewReader(TransactionMeta transactionMeta) {
        CompletableFuture<TransactionBufferReader> completableFuture = new CompletableFuture<>();
        try {
            completableFuture.complete(new PersistentTransactionBufferReader(transactionMeta, this.ledger));
        } catch (TransactionNotSealedException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> commitTxn(TxnID txnID, long j, long j2) {
        return this.txnCursor.getTxnMeta(txnID, false).thenApply(transactionMeta -> {
            return createCommitMarker(transactionMeta, j, j2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) marker -> {
            return publishMessage(txnID, marker.marker, marker.sequenceId);
        }).thenCompose(position -> {
            return this.txnCursor.commitTxn(j, j2, txnID, position);
        });
    }

    private Marker createCommitMarker(TransactionMeta transactionMeta, long j, long j2) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} create a commit marker", transactionMeta.id());
        }
        long lastSequenceId = transactionMeta.lastSequenceId() + 1;
        return Marker.builder().sequenceId(lastSequenceId).marker(Markers.newTxnCommitMarker(lastSequenceId, transactionMeta.id().getMostSigBits(), transactionMeta.id().getLeastSigBits(), PulsarMarkers.MessageIdData.newBuilder().setLedgerId(j).setEntryId(j2).build())).build();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> abortTxn(TxnID txnID) {
        return this.txnCursor.getTxnMeta(txnID, false).thenApply(transactionMeta -> {
            return createAbortMarker(transactionMeta);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) marker -> {
            return publishMessage(txnID, marker.marker, marker.sequenceId);
        }).thenCompose(position -> {
            return this.txnCursor.abortTxn(txnID);
        });
    }

    private Marker createAbortMarker(TransactionMeta transactionMeta) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} create a abort marker", transactionMeta.id());
        }
        long lastSequenceId = transactionMeta.lastSequenceId() + 1;
        return Marker.builder().sequenceId(lastSequenceId).marker(Markers.newTxnAbortMarker(lastSequenceId, transactionMeta.id().getMostSigBits(), transactionMeta.id().getLeastSigBits())).build();
    }

    private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf byteBuf, long j) {
        final CompletableFuture<Position> completableFuture = new CompletableFuture<>();
        publishMessage(byteBuf, new TxnCtx(txnID.toString(), j, completableFuture) { // from class: org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer.1
            @Override // org.apache.pulsar.broker.service.Topic.PublishContext
            public void completed(Exception exc, long j2, long j3) {
                if (exc != null) {
                    completableFuture.completeExceptionally(exc);
                } else {
                    completableFuture.complete(PositionImpl.get(j2, j3));
                }
            }
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> purgeTxns(List<Long> list) {
        if (log.isDebugEnabled()) {
            log.debug("Begin to purge the ledgers {}", list);
        }
        return FutureUtil.waitForAll((List) list.stream().map(l -> {
            return cleanTxnsOnLedger(l.longValue());
        }).collect(Collectors.toList())).thenCompose(r5 -> {
            return removeCommittedLedgerFromIndex(list);
        });
    }

    private CompletableFuture<Void> removeCommittedLedgerFromIndex(List<Long> list) {
        return FutureUtil.waitForAll((List) list.stream().map(l -> {
            return this.txnCursor.removeTxnsCommittedAtLedger(l.longValue());
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> cleanTxnsOnLedger(long j) {
        if (log.isDebugEnabled()) {
            log.debug("Start to clean ledger {}", Long.valueOf(j));
        }
        return this.txnCursor.getAllTxnsCommittedAtLedger(j).thenCompose(set -> {
            return deleteTxns(set);
        });
    }

    private CompletableFuture<Void> deleteTxns(Set<TxnID> set) {
        if (log.isDebugEnabled()) {
            log.debug("Start delete txns {} under ledger", set);
        }
        return FutureUtil.waitForAll((List) set.stream().map(txnID -> {
            return deleteTxn(txnID);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> deleteTxn(TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Start to delete txn {} entries", txnID);
        }
        return this.txnCursor.getTxnMeta(txnID, false).thenCompose(transactionMeta -> {
            return transactionMeta.readEntries(transactionMeta.numEntries(), -1L);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) sortedMap -> {
            return deleteEntries(sortedMap, txnID);
        });
    }

    private CompletableFuture<Void> deleteEntries(SortedMap<Long, Position> sortedMap, TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Delete entries {}", sortedMap);
        }
        return FutureUtil.waitForAll((List) sortedMap.values().stream().map(position -> {
            return asyncDeletePosition(position, txnID);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> asyncDeletePosition(final Position position, final TxnID txnID) {
        if (log.isDebugEnabled()) {
            log.debug("Ready to delete position {} for txn {}", position, txnID);
        }
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer.2
            public void markDeleteComplete(Object obj) {
                if (PersistentTransactionBuffer.log.isDebugEnabled()) {
                    PersistentTransactionBuffer.log.debug("Success delete transaction `{}` entry on position {}", txnID, position);
                }
                completableFuture.complete(null);
            }

            public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTransactionBuffer.log.error("Failed delete transaction `{}` entry on position {}", new Object[]{txnID, position, managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> closeAsync() {
        return FutureUtil.failedFuture(new UnsupportedOperationException());
    }
}
