package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.class */
public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor {
    private static final Logger log = LoggerFactory.getLogger(SnapshotSegmentAbortedTxnProcessorImpl.class);
    private final PersistentTopic topic;
    private volatile long lastSnapshotTimestamps;
    private final int snapshotSegmentCapacity;
    private final PersistentWorker persistentWorker;
    private static final String SNAPSHOT_PREFIX = "multiple-";
    private final LinkedMap<PositionImpl, TxnID> segmentIndex = new LinkedMap<>();
    private final LinkedMap<TxnID, TxnID> aborts = new LinkedMap<>();
    private final LinkedMap<PositionImpl, TransactionBufferSnapshotIndex> indexes = new LinkedMap<>();
    private LinkedList<TxnID> unsealedTxnIds = new LinkedList<>();

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl$PersistentWorker.class */
    public class PersistentWorker {
        private final PersistentTopic topic;
        private final SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter<TransactionBufferSnapshotSegment> snapshotSegmentsWriter;
        private final SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter;
        private static final AtomicReferenceFieldUpdater<PersistentWorker, OperationState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentWorker.class, OperationState.class, "operationState");
        protected final AtomicLong sequenceID = new AtomicLong(0);
        private volatile boolean closed = false;
        private volatile OperationState operationState = OperationState.None;
        ConcurrentLinkedDeque<Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>>> taskQueue = new ConcurrentLinkedDeque<>();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl$PersistentWorker$OperationState.class */
        public enum OperationState {
            None,
            Operating,
            Closed
        }

        /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl$PersistentWorker$OperationType.class */
        public enum OperationType {
            UpdateIndex,
            WriteSegment,
            DeleteSegment,
            Clear
        }

        public PersistentWorker(PersistentTopic persistentTopic) {
            this.topic = persistentTopic;
            this.snapshotSegmentsWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService().getReferenceWriter(TopicName.get(persistentTopic.getName()).getNamespaceObject());
            this.snapshotSegmentsWriter.getFuture().exceptionally(th -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.error("{} Failed to create snapshot index writer", persistentTopic.getName());
                persistentTopic.close();
                return null;
            });
            this.snapshotIndexWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService().getReferenceWriter(TopicName.get(persistentTopic.getName()).getNamespaceObject());
            this.snapshotIndexWriter.getFuture().exceptionally(th2 -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.error("{} Failed to create snapshot writer", persistentTopic.getName());
                persistentTopic.close();
                return null;
            });
        }

        public CompletableFuture<Void> appendTask(OperationType operationType, Supplier<CompletableFuture<Void>> supplier) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            switch (operationType) {
                case UpdateIndex:
                    if (this.taskQueue.isEmpty()) {
                        return STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Operating) ? supplier.get().whenComplete((r6, th) -> {
                            if (th != null && SnapshotSegmentAbortedTxnProcessorImpl.log.isDebugEnabled()) {
                                SnapshotSegmentAbortedTxnProcessorImpl.log.debug("[{}] Failed to update index snapshot", this.topic.getName(), th);
                            }
                            STATE_UPDATER.compareAndSet(this, OperationState.Operating, OperationState.None);
                        }) : cancelUpdateIndexTask();
                    }
                    executeTask();
                    return cancelUpdateIndexTask();
                case WriteSegment:
                case DeleteSegment:
                    if (STATE_UPDATER.get(this).equals(OperationState.Closed)) {
                        return CompletableFuture.completedFuture(null);
                    }
                    this.taskQueue.add(new MutablePair(operationType, new MutablePair(completableFuture, supplier)));
                    executeTask();
                    return completableFuture;
                case Clear:
                    if (!STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Closed)) {
                        return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException(String.format("Failed to clear the snapshot of topic [%s] due to the topic is used. Please stop the using of the topic and try it again", this.topic.getName())));
                    }
                    this.taskQueue.forEach(pair -> {
                        ((CompletableFuture) ((Supplier) ((Pair) pair.getRight()).getRight()).get()).completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(String.format("Cancel the operation [%s] due to the transaction buffer of the topic[%s] already closed", ((OperationType) pair.getLeft()).name(), this.topic.getName())));
                    });
                    this.taskQueue.clear();
                    return supplier.get();
                default:
                    return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException(String.format("Th operation [%s] is unsupported", operationType.name())));
            }
        }

        private CompletableFuture<Void> cancelUpdateIndexTask() {
            if (SnapshotSegmentAbortedTxnProcessorImpl.log.isDebugEnabled()) {
                SnapshotSegmentAbortedTxnProcessorImpl.log.debug("The operation of updating index is canceled due there is other operation executing");
            }
            return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("The operation of updating index is canceled"));
        }

        private void executeTask() {
            if (this.taskQueue.isEmpty() || !STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Operating) || this.taskQueue.isEmpty()) {
                return;
            }
            Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>> first = this.taskQueue.getFirst();
            ((CompletableFuture) ((Supplier) ((Pair) first.getValue()).getRight()).get()).whenComplete((r9, th) -> {
                if (th != null) {
                    if (SnapshotSegmentAbortedTxnProcessorImpl.log.isDebugEnabled()) {
                        SnapshotSegmentAbortedTxnProcessorImpl.log.debug("[{}] Failed to do operation do operation of [{}]", new Object[]{this.topic.getName(), ((OperationType) first.getKey()).name(), th});
                    }
                    ((CompletableFuture) ((Pair) first.getRight()).getKey()).completeExceptionally(th);
                } else {
                    ((CompletableFuture) ((Pair) first.getRight()).getKey()).complete(null);
                    this.taskQueue.removeFirst();
                    this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this).submit(this::executeTask);
                }
                STATE_UPDATER.compareAndSet(this, OperationState.Operating, OperationState.None);
            });
        }

        private CompletableFuture<Void> takeSnapshotSegmentAsync(LinkedList<TxnID> linkedList, PositionImpl positionImpl) {
            CompletableFuture<Void> thenRun = writeSnapshotSegmentAsync(linkedList, positionImpl).thenRun(() -> {
                if (SnapshotSegmentAbortedTxnProcessorImpl.log.isDebugEnabled()) {
                    SnapshotSegmentAbortedTxnProcessorImpl.log.debug("Successes to take snapshot segment [{}] at maxReadPosition [{}] for the topic [{}], and the size of the segment is [{}]", new Object[]{this.sequenceID, positionImpl, this.topic.getName(), Integer.valueOf(linkedList.size())});
                }
                this.sequenceID.getAndIncrement();
            });
            thenRun.exceptionally(th -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.error("Failed to take snapshot segment [{}] at maxReadPosition [{}] for the topic [{}], and the size of the segment is [{}]", new Object[]{this.sequenceID, positionImpl, this.topic.getName(), Integer.valueOf(linkedList.size()), th});
                return null;
            });
            return thenRun;
        }

        private CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> linkedList, PositionImpl positionImpl) {
            TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = new TransactionBufferSnapshotSegment();
            transactionBufferSnapshotSegment.setAborts(SnapshotSegmentAbortedTxnProcessorImpl.this.convertTypeToTxnIDData(linkedList));
            transactionBufferSnapshotSegment.setTopicName(this.topic.getName());
            transactionBufferSnapshotSegment.setPersistentPositionEntryId(positionImpl.getEntryId());
            transactionBufferSnapshotSegment.setPersistentPositionLedgerId(positionImpl.getLedgerId());
            return this.snapshotSegmentsWriter.getFuture().thenCompose(writer -> {
                transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
                return writer.writeAsync(SnapshotSegmentAbortedTxnProcessorImpl.this.buildKey(this.sequenceID.get()), transactionBufferSnapshotSegment);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) messageId -> {
                TransactionBufferSnapshotIndex transactionBufferSnapshotIndex = new TransactionBufferSnapshotIndex();
                transactionBufferSnapshotIndex.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
                transactionBufferSnapshotIndex.setAbortedMarkLedgerID(positionImpl.getLedgerId());
                transactionBufferSnapshotIndex.setAbortedMarkEntryID(positionImpl.getEntryId());
                transactionBufferSnapshotIndex.setSegmentLedgerID(((MessageIdImpl) messageId).getLedgerId());
                transactionBufferSnapshotIndex.setSegmentEntryID(((MessageIdImpl) messageId).getEntryId());
                SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.put(positionImpl, transactionBufferSnapshotIndex);
                return updateIndexWhenExecuteTheLatestTask();
            });
        }

        private CompletionStage<Void> updateIndexWhenExecuteTheLatestTask() {
            PositionImpl maxReadPosition = this.topic.getMaxReadPosition();
            return this.taskQueue.size() != 1 ? CompletableFuture.completedFuture(null) : updateSnapshotIndex(new TransactionBufferSnapshotIndexesMetadata(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId(), SnapshotSegmentAbortedTxnProcessorImpl.this.convertTypeToTxnIDData(SnapshotSegmentAbortedTxnProcessorImpl.this.unsealedTxnIds)));
        }

        private CompletableFuture<Void> deleteSnapshotSegment(List<PositionImpl> list) {
            ArrayList arrayList = new ArrayList();
            for (PositionImpl positionImpl : list) {
                long sequenceID = ((TransactionBufferSnapshotIndex) SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.get(positionImpl)).getSequenceID();
                CompletableFuture thenCompose = this.snapshotSegmentsWriter.getFuture().thenCompose(writer -> {
                    return writer.deleteAsync(SnapshotSegmentAbortedTxnProcessorImpl.this.buildKey(sequenceID), null);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) messageId -> {
                    if (SnapshotSegmentAbortedTxnProcessorImpl.log.isDebugEnabled()) {
                        SnapshotSegmentAbortedTxnProcessorImpl.log.debug("[{}] Successes to delete the snapshot segment, whose sequenceId is [{}] and maxReadPosition is [{}]", new Object[]{this.topic.getName(), this.sequenceID, positionImpl});
                    }
                    SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.remove(positionImpl);
                    return updateIndexWhenExecuteTheLatestTask();
                });
                thenCompose.exceptionally(th -> {
                    SnapshotSegmentAbortedTxnProcessorImpl.log.warn("[{}] Failed to delete the snapshot segment, whose sequenceId is [{}] and maxReadPosition is [{}]", new Object[]{this.topic.getName(), this.sequenceID, positionImpl, th});
                    return null;
                });
                arrayList.add(thenCompose);
            }
            return FutureUtil.waitForAll(arrayList);
        }

        private CompletableFuture<Void> updateSnapshotIndex(TransactionBufferSnapshotIndexesMetadata transactionBufferSnapshotIndexesMetadata) {
            TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes = new TransactionBufferSnapshotIndexes();
            CompletableFuture thenCompose = this.snapshotIndexWriter.getFuture().thenCompose(writer -> {
                transactionBufferSnapshotIndexes.setIndexList(SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.values().stream().toList());
                transactionBufferSnapshotIndexes.setSnapshot(transactionBufferSnapshotIndexesMetadata);
                transactionBufferSnapshotIndexes.setTopicName(this.topic.getName());
                return writer.writeAsync(this.topic.getName(), transactionBufferSnapshotIndexes).thenCompose(messageId -> {
                    return CompletableFuture.completedFuture(null);
                });
            });
            thenCompose.thenRun(() -> {
                SnapshotSegmentAbortedTxnProcessorImpl.this.lastSnapshotTimestamps = System.currentTimeMillis();
            }).exceptionally(th -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.error("[{}] Failed to update snapshot segment index", transactionBufferSnapshotIndexes.getTopicName(), th);
                return null;
            });
            return thenCompose;
        }

        private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
            CompletableFuture<Void> thenRun = SnapshotSegmentAbortedTxnProcessorImpl.this.persistentWorker.clearAllSnapshotSegments().thenCompose(r4 -> {
                return this.snapshotIndexWriter.getFuture().thenCompose(writer -> {
                    return writer.writeAsync(this.topic.getName(), null);
                });
            }).thenRun(() -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]", this.topic.getName());
            });
            thenRun.exceptionally(th -> {
                SnapshotSegmentAbortedTxnProcessorImpl.log.error("Failed to clear the snapshot segment and indexes for the topic [{}]", this.topic.getName(), th);
                return null;
            });
            return thenRun;
        }

        private CompletableFuture<Void> clearAllSnapshotSegments() {
            return this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService().createReader(TopicName.get(this.topic.getName())).thenComposeAsync(reader -> {
                while (reader.hasMoreEvents()) {
                    try {
                        try {
                            Message message = (Message) reader.readNextAsync().get(SnapshotSegmentAbortedTxnProcessorImpl.this.getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
                            if (this.topic.getName().equals(((TransactionBufferSnapshotSegment) message.getValue()).getTopicName())) {
                                this.snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
                            }
                        } catch (Exception e) {
                            SnapshotSegmentAbortedTxnProcessorImpl.log.error("[{}] Transaction buffer clear snapshot segments fail!", this.topic.getName(), e);
                            CompletableFuture failedFuture = FutureUtil.failedFuture(e);
                            SnapshotSegmentAbortedTxnProcessorImpl.this.closeReader(reader);
                            return failedFuture;
                        }
                    } catch (Throwable th) {
                        SnapshotSegmentAbortedTxnProcessorImpl.this.closeReader(reader);
                        throw th;
                    }
                }
                CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
                SnapshotSegmentAbortedTxnProcessorImpl.this.closeReader(reader);
                return completedFuture;
            });
        }

        synchronized CompletableFuture<Void> closeAsync() {
            if (!this.closed) {
                this.closed = true;
                this.snapshotSegmentsWriter.release();
                this.snapshotIndexWriter.release();
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    public SnapshotSegmentAbortedTxnProcessorImpl(PersistentTopic persistentTopic) {
        this.topic = persistentTopic;
        this.persistentWorker = new PersistentWorker(persistentTopic);
        this.snapshotSegmentCapacity = ((persistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotSegmentSize() - 8) - persistentTopic.getName().length()) / 3;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public void putAbortedTxnAndPosition(TxnID txnID, PositionImpl positionImpl) {
        this.unsealedTxnIds.add(txnID);
        this.aborts.put(txnID, txnID);
        if (this.unsealedTxnIds.size() >= this.snapshotSegmentCapacity) {
            LinkedList<TxnID> linkedList = this.unsealedTxnIds;
            this.segmentIndex.put(positionImpl, txnID);
            this.persistentWorker.appendTask(PersistentWorker.OperationType.WriteSegment, () -> {
                return this.persistentWorker.takeSnapshotSegmentAsync(linkedList, positionImpl);
            });
            this.unsealedTxnIds = new LinkedList<>();
        }
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public boolean checkAbortedTransaction(TxnID txnID) {
        return this.aborts.containsKey(txnID);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public void trimExpiredAbortedTxns() {
        ArrayList arrayList = new ArrayList();
        while (!this.segmentIndex.isEmpty() && !this.topic.getManagedLedger().ledgerExists(((PositionImpl) this.segmentIndex.firstKey()).getLedgerId())) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Topic transaction buffer clear aborted transactions, maxReadPosition : {}", this.topic.getName(), this.segmentIndex.firstKey());
            }
            arrayList.add((PositionImpl) this.segmentIndex.firstKey());
            TxnID txnID = (TxnID) this.segmentIndex.remove(0);
            while (!((TxnID) this.aborts.firstKey()).equals(txnID)) {
                this.aborts.remove(0);
            }
            this.aborts.remove(0);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.persistentWorker.appendTask(PersistentWorker.OperationType.DeleteSegment, () -> {
            return this.persistentWorker.deleteSnapshotSegment(arrayList);
        });
    }

    private String buildKey(long j) {
        this.topic.getName();
        return "multiple-" + j + "-" + j;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl positionImpl) {
        TransactionBufferSnapshotIndexesMetadata transactionBufferSnapshotIndexesMetadata = new TransactionBufferSnapshotIndexesMetadata(positionImpl.getLedgerId(), positionImpl.getEntryId(), convertTypeToTxnIDData(this.unsealedTxnIds));
        return this.persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex, () -> {
            return this.persistentWorker.updateSnapshotIndex(transactionBufferSnapshotIndexesMetadata);
        });
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
        return this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService().createReader(TopicName.get(this.topic.getName())).thenComposeAsync(reader -> {
            TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes;
            PositionImpl positionImpl = null;
            TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes2 = null;
            while (reader.hasMoreEvents()) {
                try {
                    try {
                        try {
                            Message message = (Message) reader.readNextAsync().get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
                            if (this.topic.getName().equals(message.getKey()) && (transactionBufferSnapshotIndexes = (TransactionBufferSnapshotIndexes) message.getValue()) != null) {
                                transactionBufferSnapshotIndexes2 = transactionBufferSnapshotIndexes;
                                positionImpl = PositionImpl.get(transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(), transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
                            }
                        } catch (TimeoutException e) {
                            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(e);
                            String format = String.format("[%s] Transaction buffer recover fail by read transactionBufferSnapshot timeout!", this.topic.getName());
                            log.error(format, unwrapCompletionException);
                            CompletableFuture failedFuture = FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException(format, unwrapCompletionException));
                            closeReader(reader);
                            return failedFuture;
                        }
                    } catch (Exception e2) {
                        log.error("[{}] Transaction buffer recover fail when read transactionBufferSnapshot!", this.topic.getName(), e2);
                        CompletableFuture failedFuture2 = FutureUtil.failedFuture(e2);
                        closeReader(reader);
                        return failedFuture2;
                    }
                } catch (Throwable th) {
                    closeReader(reader);
                    throw th;
                }
            }
            closeReader(reader);
            PositionImpl positionImpl2 = positionImpl;
            final TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes3 = transactionBufferSnapshotIndexes2;
            if (transactionBufferSnapshotIndexes2 == null) {
                return CompletableFuture.completedFuture(null);
            }
            this.unsealedTxnIds = convertTypeToTxnID(transactionBufferSnapshotIndexes2.getSnapshot().getAborts());
            final ArrayList arrayList = new ArrayList();
            final CompletableFuture completableFuture = new CompletableFuture();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(TopicName.get(TopicDomain.persistent.toString(), TopicName.get(this.topic.getName()).getNamespaceObject(), "__transaction_buffer_snapshot_segments").getPersistenceNamingEncoding(), new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl.1
                public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedgerImpl, Object obj) {
                    List<TransactionBufferSnapshotIndex> indexList = transactionBufferSnapshotIndexes3.getIndexList();
                    ArrayList arrayList2 = arrayList;
                    AtomicBoolean atomicBoolean2 = atomicBoolean;
                    indexList.forEach(transactionBufferSnapshotIndex -> {
                        final CompletableFuture completableFuture2 = new CompletableFuture();
                        arrayList2.add(completableFuture2);
                        readOnlyManagedLedgerImpl.asyncReadEntry(new PositionImpl(transactionBufferSnapshotIndex.getSegmentLedgerID(), transactionBufferSnapshotIndex.getSegmentEntryID()), new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl.1.1
                            public void readEntryComplete(Entry entry, Object obj2) {
                                SnapshotSegmentAbortedTxnProcessorImpl.this.handleSnapshotSegmentEntry(entry);
                                SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.put(new PositionImpl(transactionBufferSnapshotIndex.abortedMarkLedgerID, transactionBufferSnapshotIndex.abortedMarkEntryID), transactionBufferSnapshotIndex);
                                entry.release();
                                completableFuture2.complete(null);
                            }

                            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj2) {
                                if (!SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getManagedLedger().ledgerExists(transactionBufferSnapshotIndex.getAbortedMarkLedgerID())) {
                                    atomicBoolean2.set(true);
                                } else {
                                    SnapshotSegmentAbortedTxnProcessorImpl.log.error("[{}] Failed to read snapshot segment [{}:{}]", new Object[]{SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName(), Long.valueOf(transactionBufferSnapshotIndex.segmentLedgerID), Long.valueOf(transactionBufferSnapshotIndex.segmentEntryID), managedLedgerException});
                                    completableFuture2.completeExceptionally(managedLedgerException);
                                }
                            }
                        }, (Object) null);
                    });
                    completableFuture.complete(null);
                }

                public void openReadOnlyManagedLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    SnapshotSegmentAbortedTxnProcessorImpl.log.error("[{}] Failed to open readOnly managed ledger", SnapshotSegmentAbortedTxnProcessorImpl.this.topic, managedLedgerException);
                    completableFuture.completeExceptionally(managedLedgerException);
                }
            }, this.topic.getManagedLedger().getConfig(), (Object) null);
            return completableFuture.thenCompose(r3 -> {
                return FutureUtil.waitForAll(arrayList);
            }).thenCompose(r10 -> {
                if (atomicBoolean.get()) {
                    this.persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex, () -> {
                        return this.persistentWorker.updateSnapshotIndex(transactionBufferSnapshotIndexes3.getSnapshot());
                    });
                }
                if (this.indexes.size() != 0) {
                    this.persistentWorker.sequenceID.set(((TransactionBufferSnapshotIndex) this.indexes.get(this.indexes.lastKey())).sequenceID + 1);
                }
                this.unsealedTxnIds.forEach(txnID -> {
                    this.aborts.put(txnID, txnID);
                });
                return CompletableFuture.completedFuture(positionImpl2);
            }).exceptionally(th2 -> {
                log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), th2);
                return null;
            });
        }, (Executor) this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this));
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public CompletableFuture<Void> clearAbortedTxnSnapshot() {
        PersistentWorker persistentWorker = this.persistentWorker;
        PersistentWorker.OperationType operationType = PersistentWorker.OperationType.Clear;
        PersistentWorker persistentWorker2 = this.persistentWorker;
        Objects.requireNonNull(persistentWorker2);
        return persistentWorker.appendTask(operationType, persistentWorker2::clearSnapshotSegmentAndIndexes);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public long getLastSnapshotTimestamps() {
        return this.lastSnapshotTimestamps;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor
    public CompletableFuture<Void> closeAsync() {
        return this.persistentWorker.closeAsync();
    }

    private void handleSnapshotSegmentEntry(Entry entry) {
        ByteBuf dataBuffer = entry.getDataBuffer();
        Commands.parseMessageMetadata(dataBuffer);
        TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = (TransactionBufferSnapshotSegment) Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer(dataBuffer).nioBuffer());
        TxnIDData txnIDData = transactionBufferSnapshotSegment.getAborts().get(transactionBufferSnapshotSegment.getAborts().size() - 1);
        this.segmentIndex.put(new PositionImpl(transactionBufferSnapshotSegment.getPersistentPositionLedgerId(), transactionBufferSnapshotSegment.getPersistentPositionEntryId()), new TxnID(txnIDData.getMostSigBits(), txnIDData.getLeastSigBits()));
        convertTypeToTxnID(transactionBufferSnapshotSegment.getAborts()).forEach(txnID -> {
            this.aborts.put(txnID, txnID);
        });
    }

    private long getSystemClientOperationTimeoutMs() throws Exception {
        return this.topic.getBrokerService().getPulsar().getClient().getConfiguration().getOperationTimeoutMs();
    }

    private <T> void closeReader(SystemTopicClient.Reader<T> reader) {
        reader.closeAsync().exceptionally(th -> {
            log.error("[{}]Transaction buffer snapshot reader close error!", this.topic.getName(), th);
            return null;
        });
    }

    private LinkedList<TxnID> convertTypeToTxnID(List<TxnIDData> list) {
        LinkedList<TxnID> linkedList = new LinkedList<>();
        list.forEach(txnIDData -> {
            linkedList.add(new TxnID(txnIDData.getMostSigBits(), txnIDData.getLeastSigBits()));
        });
        return linkedList;
    }

    private List<TxnIDData> convertTypeToTxnIDData(List<TxnID> list) {
        LinkedList linkedList = new LinkedList();
        list.forEach(txnID -> {
            linkedList.add(new TxnIDData(txnID.getMostSigBits(), txnID.getLeastSigBits()));
        });
        return linkedList;
    }
}
