/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SegmentStats;
import org.apache.pulsar.common.policies.data.SegmentsStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcessor {
    private static final Logger log = LoggerFactory.getLogger(SnapshotSegmentAbortedTxnProcessorImpl.class);
    private LinkedList<TxnID> unsealedTxnIds;
    private final LinkedMap<PositionImpl, TxnID> segmentIndex = new LinkedMap();
    private final LinkedMap<TxnID, TxnID> aborts = new LinkedMap();
    private final LinkedMap<PositionImpl, TransactionBufferSnapshotIndex> indexes = new LinkedMap();
    private final PersistentTopic topic;
    private volatile long lastSnapshotTimestamps;
    private volatile long lastTakedSnapshotSegmentTimestamp;
    private final int snapshotSegmentCapacity;
    private final PersistentWorker persistentWorker;
    private static final String SNAPSHOT_PREFIX = "multiple-";

    public SnapshotSegmentAbortedTxnProcessorImpl(PersistentTopic topic) {
        this.topic = topic;
        this.persistentWorker = new PersistentWorker(topic);
        this.snapshotSegmentCapacity = (topic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotSegmentSize() - 8 - topic.getName().length()) / 3;
        this.unsealedTxnIds = new LinkedList();
    }

    @Override
    public void putAbortedTxnAndPosition(TxnID txnID, PositionImpl position) {
        this.unsealedTxnIds.add(txnID);
        this.aborts.put((Object)txnID, (Object)txnID);
        if (this.unsealedTxnIds.size() >= this.snapshotSegmentCapacity) {
            LinkedList<TxnID> abortedSegment = this.unsealedTxnIds;
            this.segmentIndex.put((Object)position, (Object)txnID);
            this.persistentWorker.appendTask(PersistentWorker.OperationType.WriteSegment, () -> this.persistentWorker.takeSnapshotSegmentAsync(abortedSegment, position));
            this.unsealedTxnIds = new LinkedList();
        }
    }

    @Override
    public boolean checkAbortedTransaction(TxnID txnID) {
        return this.aborts.containsKey((Object)txnID);
    }

    @Override
    public void trimExpiredAbortedTxns() {
        ArrayList<PositionImpl> positionsNeedToDelete = new ArrayList<PositionImpl>();
        while (!this.segmentIndex.isEmpty() && !((ManagedLedgerImpl)this.topic.getManagedLedger()).ledgerExists(((PositionImpl)this.segmentIndex.firstKey()).getLedgerId())) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Topic transaction buffer clear aborted transactions, maxReadPosition : {}", (Object)this.topic.getName(), this.segmentIndex.firstKey());
            }
            PositionImpl positionNeedToDelete = (PositionImpl)this.segmentIndex.firstKey();
            positionsNeedToDelete.add(positionNeedToDelete);
            TxnID theLatestDeletedTxnID = (TxnID)this.segmentIndex.remove(0);
            while (!((TxnID)this.aborts.firstKey()).equals((Object)theLatestDeletedTxnID)) {
                this.aborts.remove(0);
            }
            this.aborts.remove(0);
        }
        if (!positionsNeedToDelete.isEmpty()) {
            this.persistentWorker.appendTask(PersistentWorker.OperationType.DeleteSegment, () -> this.persistentWorker.deleteSnapshotSegment(positionsNeedToDelete));
        }
    }

    private String buildKey(long sequenceId) {
        return SNAPSHOT_PREFIX + sequenceId + "-" + this.topic.getName();
    }

    @Override
    public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosition) {
        TransactionBufferSnapshotIndexesMetadata metadata = new TransactionBufferSnapshotIndexesMetadata(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId(), this.convertTypeToTxnIDData(this.unsealedTxnIds));
        return this.persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex, () -> this.persistentWorker.updateSnapshotIndex(metadata));
    }

    @Override
    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
        PulsarService pulsar = this.topic.getBrokerService().getPulsar();
        CompletableFuture<PositionImpl> future = new CompletableFuture<PositionImpl>();
        pulsar.getTransactionExecutorProvider().getExecutor((Object)this).execute(() -> {
            try {
                TransactionBufferSnapshotIndexes indexes = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService().getTableView().readLatest(this.topic.getName());
                if (indexes == null) {
                    future.complete(this.recoverOldSnapshot());
                    return;
                }
                TransactionBufferSnapshotIndexesMetadata snapshot = indexes.getSnapshot();
                PositionImpl startReadCursorPosition = new PositionImpl(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
                this.unsealedTxnIds = this.convertTypeToTxnID(snapshot.getAborts());
                TopicName snapshotSegmentTopicName = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)TopicName.get((String)this.topic.getName()).getNamespaceObject(), (String)"__transaction_buffer_snapshot_segments");
                this.readSegmentEntries(snapshotSegmentTopicName, indexes);
                if (!this.indexes.isEmpty()) {
                    this.persistentWorker.sequenceID.set(((TransactionBufferSnapshotIndex)this.indexes.get((Object)this.indexes.lastKey())).sequenceID + 1L);
                }
                this.unsealedTxnIds.forEach(txnID -> this.aborts.put(txnID, txnID));
                future.complete(startReadCursorPosition);
            }
            catch (Throwable throwable) {
                future.completeExceptionally(throwable);
            }
        });
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readSegmentEntries(TopicName topicName, TransactionBufferSnapshotIndexes indexes) throws Exception {
        ReadOnlyManagedLedgerImpl managedLedger = this.openReadOnlyManagedLedger(topicName);
        boolean hasInvalidIndex = false;
        for (TransactionBufferSnapshotIndex index : indexes.getIndexList()) {
            PositionImpl position = new PositionImpl(index.getSegmentLedgerID(), index.getSegmentEntryID());
            PositionImpl abortedPosition = new PositionImpl(index.abortedMarkLedgerID, index.abortedMarkEntryID);
            try {
                Entry entry = this.readEntry(managedLedger, position);
                try {
                    this.handleSnapshotSegmentEntry(entry);
                    this.indexes.put((Object)abortedPosition, (Object)index);
                }
                finally {
                    entry.release();
                }
            }
            catch (Throwable throwable) {
                if (((ManagedLedgerImpl)this.topic.getManagedLedger()).ledgerExists(index.getAbortedMarkLedgerID())) {
                    log.error("[{}] Failed to read snapshot segment [{}:{}]", new Object[]{this.topic.getName(), index.segmentLedgerID, index.segmentEntryID, throwable});
                    throw throwable;
                }
                hasInvalidIndex = true;
            }
        }
        if (hasInvalidIndex) {
            this.persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex, () -> this.persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
        }
    }

    private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName topicName) throws Exception {
        final CompletableFuture future = new CompletableFuture();
        AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback(){

            public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx) {
                future.complete(managedLedger);
            }

            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }

            public String toString() {
                return String.format("Transaction buffer [%s] recover from snapshot", SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
            }
        };
        this.topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(topicName.getPersistenceNamingEncoding(), callback, this.topic.getManagedLedger().getConfig(), null);
        return (ReadOnlyManagedLedgerImpl)this.wait(future, "open read only ml for " + topicName);
    }

    private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, PositionImpl position) throws Exception {
        final CompletableFuture future = new CompletableFuture();
        managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, null);
        return (Entry)this.wait(future, "read entry from " + position);
    }

    private PositionImpl recoverOldSnapshot() throws Exception {
        PulsarService pulsar = this.topic.getBrokerService().getPulsar();
        TopicName topicName = TopicName.get((String)this.topic.getName());
        List topics = (List)this.wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(NamespaceName.get((String)topicName.getNamespace())), "list persistent topics");
        if (!topics.contains(TopicDomain.persistent + "://" + topicName.getNamespace() + "/__transaction_buffer_snapshot")) {
            return null;
        }
        TransactionBufferSnapshot snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService().getTableView().readLatest(this.topic.getName());
        if (snapshot == null) {
            return null;
        }
        this.handleOldSnapshot(snapshot);
        return new PositionImpl(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
    }

    private void handleOldSnapshot(TransactionBufferSnapshot snapshot) {
        if (snapshot.getAborts() != null) {
            snapshot.getAborts().forEach(abortTxnMetadata -> {
                TxnID txnID = new TxnID(abortTxnMetadata.getTxnIdMostBits(), abortTxnMetadata.getTxnIdLeastBits());
                this.aborts.put((Object)txnID, (Object)txnID);
                this.unsealedTxnIds.add(txnID);
            });
        }
    }

    @Override
    public CompletableFuture<Void> clearAbortedTxnSnapshot() {
        return this.persistentWorker.appendTask(PersistentWorker.OperationType.Clear, this.persistentWorker::clearSnapshotSegmentAndIndexes);
    }

    @Override
    public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
        TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
        transactionBufferStats.totalAbortedTransactions = this.aborts.size();
        transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
        SegmentsStats segmentsStats = new SegmentsStats();
        segmentsStats.currentSegmentCapacity = this.snapshotSegmentCapacity;
        segmentsStats.lastTookSnapshotSegmentTimestamp = this.lastTakedSnapshotSegmentTimestamp;
        segmentsStats.unsealedAbortTxnIDSize = this.unsealedTxnIds.size();
        segmentsStats.segmentsSize = this.indexes.size();
        if (segmentStats) {
            ArrayList statsList = new ArrayList();
            this.segmentIndex.forEach((position, txnID) -> {
                SegmentStats stats = new SegmentStats(txnID.toString(), position.toString());
                statsList.add(stats);
            });
            segmentsStats.segmentStats = statsList;
        }
        transactionBufferStats.segmentsStats = segmentsStats;
        return transactionBufferStats;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.persistentWorker.closeAsync();
    }

    private void handleSnapshotSegmentEntry(Entry entry) {
        ByteBuf headersAndPayload = entry.getDataBuffer();
        Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        TransactionBufferSnapshotSegment snapshotSegment = (TransactionBufferSnapshotSegment)Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer((ByteBuf)headersAndPayload).nioBuffer());
        TxnIDData lastTxn = snapshotSegment.getAborts().get(snapshotSegment.getAborts().size() - 1);
        this.segmentIndex.put((Object)new PositionImpl(snapshotSegment.getPersistentPositionLedgerId(), snapshotSegment.getPersistentPositionEntryId()), (Object)new TxnID(lastTxn.getMostSigBits(), lastTxn.getLeastSigBits()));
        this.convertTypeToTxnID(snapshotSegment.getAborts()).forEach(txnID -> this.aborts.put(txnID, txnID));
    }

    private long getSystemClientOperationTimeoutMs() throws Exception {
        PulsarClientImpl pulsarClient = (PulsarClientImpl)this.topic.getBrokerService().getPulsar().getClient();
        return pulsarClient.getConfiguration().getOperationTimeoutMs();
    }

    private <R> R wait(CompletableFuture<R> future, String msg) throws Exception {
        try {
            return future.get(this.getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw new CompletionException("Failed to " + msg, e.getCause());
        }
    }

    private <T> void closeReader(SystemTopicClient.Reader<T> reader) {
        reader.closeAsync().exceptionally(e -> {
            log.warn("[{}] Failed to close reader: {}", (Object)this.topic.getName(), (Object)e.getMessage());
            return null;
        });
    }

    private LinkedList<TxnID> convertTypeToTxnID(List<TxnIDData> snapshotSegment) {
        LinkedList<TxnID> abortedTxns = new LinkedList<TxnID>();
        snapshotSegment.forEach(txnIDData -> abortedTxns.add(new TxnID(txnIDData.getMostSigBits(), txnIDData.getLeastSigBits())));
        return abortedTxns;
    }

    private List<TxnIDData> convertTypeToTxnIDData(List<TxnID> abortedTxns) {
        LinkedList<TxnIDData> segment = new LinkedList<TxnIDData>();
        abortedTxns.forEach(txnID -> segment.add(new TxnIDData(txnID.getMostSigBits(), txnID.getLeastSigBits())));
        return segment;
    }

    public class PersistentWorker {
        protected final AtomicLong sequenceID = new AtomicLong(0L);
        private final PersistentTopic topic;
        private final SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter<TransactionBufferSnapshotSegment> snapshotSegmentsWriter;
        private final SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter;
        private volatile boolean closed = false;
        private static final AtomicReferenceFieldUpdater<PersistentWorker, OperationState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentWorker.class, OperationState.class, "operationState");
        private volatile OperationState operationState = OperationState.None;
        ConcurrentLinkedDeque<Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>>> taskQueue = new ConcurrentLinkedDeque();

        public PersistentWorker(PersistentTopic topic) {
            this.topic = topic;
            this.snapshotSegmentsWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService().getReferenceWriter(TopicName.get((String)topic.getName()).getNamespaceObject());
            this.snapshotSegmentsWriter.getFuture().exceptionally(ex -> {
                log.error("{} Failed to create snapshot index writer", (Object)topic.getName());
                topic.close();
                return null;
            });
            this.snapshotIndexWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService().getReferenceWriter(TopicName.get((String)topic.getName()).getNamespaceObject());
            this.snapshotIndexWriter.getFuture().exceptionally(ex -> {
                log.error("{} Failed to create snapshot writer", (Object)topic.getName());
                topic.close();
                return null;
            });
        }

        public CompletableFuture<Void> appendTask(OperationType operationType, Supplier<CompletableFuture<Void>> task) {
            CompletableFuture<Void> taskExecutedResult = new CompletableFuture<Void>();
            switch (operationType) {
                case UpdateIndex: {
                    if (!this.taskQueue.isEmpty()) {
                        this.executeTask();
                        return this.cancelUpdateIndexTask();
                    }
                    if (STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Operating)) {
                        return task.get().whenComplete((ignore, throwable) -> {
                            if (throwable != null && log.isDebugEnabled()) {
                                log.debug("[{}] Failed to update index snapshot", (Object)this.topic.getName(), throwable);
                            }
                            STATE_UPDATER.compareAndSet(this, OperationState.Operating, OperationState.None);
                        });
                    }
                    return this.cancelUpdateIndexTask();
                }
                case WriteSegment: 
                case DeleteSegment: {
                    if (!STATE_UPDATER.get(this).equals((Object)OperationState.Closed)) {
                        this.taskQueue.add((Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>>)new MutablePair((Object)operationType, (Object)new MutablePair(taskExecutedResult, task)));
                        this.executeTask();
                        return taskExecutedResult;
                    }
                    return CompletableFuture.completedFuture(null);
                }
                case Clear: {
                    if (STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Closed)) {
                        this.taskQueue.forEach((Consumer<Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>>>)((Consumer<Pair>)pair -> ((CompletableFuture)((Supplier)((Pair)pair.getRight()).getRight()).get()).completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(String.format("Cancel the operation [%s] due to the transaction buffer of the topic[%s] already closed", ((OperationType)((Object)((Object)pair.getLeft()))).name(), this.topic.getName())))));
                        this.taskQueue.clear();
                        return task.get();
                    }
                    return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException(String.format("Failed to clear the snapshot of topic [%s] due to the topic is used. Please stop the using of the topic and try it again", this.topic.getName())));
                }
            }
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NotAllowedException(String.format("Th operation [%s] is unsupported", operationType.name())));
        }

        private CompletableFuture<Void> cancelUpdateIndexTask() {
            if (log.isDebugEnabled()) {
                log.debug("The operation of updating index is canceled due there is other operation executing");
            }
            return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServiceUnitNotReadyException("The operation of updating index is canceled"));
        }

        private void executeTask() {
            if (this.taskQueue.isEmpty()) {
                return;
            }
            if (STATE_UPDATER.compareAndSet(this, OperationState.None, OperationState.Operating)) {
                if (this.taskQueue.isEmpty()) {
                    return;
                }
                Pair<OperationType, Pair<CompletableFuture<Void>, Supplier<CompletableFuture<Void>>>> firstTask = this.taskQueue.getFirst();
                ((CompletableFuture)((Supplier)((Pair)firstTask.getValue()).getRight()).get()).whenComplete((ignore, throwable) -> {
                    if (throwable != null) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Failed to do operation do operation of [{}]", new Object[]{this.topic.getName(), ((OperationType)((Object)((Object)firstTask.getKey()))).name(), throwable});
                        }
                        ((CompletableFuture)((Pair)firstTask.getRight()).getKey()).completeExceptionally((Throwable)throwable);
                    } else {
                        ((CompletableFuture)((Pair)firstTask.getRight()).getKey()).complete(null);
                        this.taskQueue.removeFirst();
                        this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor((Object)this).submit(this::executeTask);
                    }
                    STATE_UPDATER.compareAndSet(this, OperationState.Operating, OperationState.None);
                });
            }
        }

        private CompletableFuture<Void> takeSnapshotSegmentAsync(LinkedList<TxnID> sealedAbortedTxnIdSegment, PositionImpl abortedMarkerPersistentPosition) {
            CompletionStage res = this.writeSnapshotSegmentAsync(sealedAbortedTxnIdSegment, abortedMarkerPersistentPosition).thenRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("Successes to take snapshot segment [{}] at maxReadPosition [{}] for the topic [{}], and the size of the segment is [{}]", new Object[]{this.sequenceID, abortedMarkerPersistentPosition, this.topic.getName(), sealedAbortedTxnIdSegment.size()});
                }
                this.sequenceID.getAndIncrement();
            });
            ((CompletableFuture)res).exceptionally(e -> {
                log.error("Failed to take snapshot segment [{}] at maxReadPosition [{}] for the topic [{}], and the size of the segment is [{}]", new Object[]{this.sequenceID, abortedMarkerPersistentPosition, this.topic.getName(), sealedAbortedTxnIdSegment.size(), e});
                return null;
            });
            return res;
        }

        private CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> segment, PositionImpl abortedMarkerPersistentPosition) {
            TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = new TransactionBufferSnapshotSegment();
            transactionBufferSnapshotSegment.setAborts(SnapshotSegmentAbortedTxnProcessorImpl.this.convertTypeToTxnIDData(segment));
            transactionBufferSnapshotSegment.setTopicName(this.topic.getName());
            transactionBufferSnapshotSegment.setPersistentPositionEntryId(abortedMarkerPersistentPosition.getEntryId());
            transactionBufferSnapshotSegment.setPersistentPositionLedgerId(abortedMarkerPersistentPosition.getLedgerId());
            return ((CompletableFuture)this.snapshotSegmentsWriter.getFuture().thenCompose(segmentWriter -> {
                transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
                return segmentWriter.writeAsync(SnapshotSegmentAbortedTxnProcessorImpl.this.buildKey(this.sequenceID.get()), transactionBufferSnapshotSegment);
            })).thenCompose(messageId -> {
                SnapshotSegmentAbortedTxnProcessorImpl.this.lastTakedSnapshotSegmentTimestamp = System.currentTimeMillis();
                TransactionBufferSnapshotIndex index = new TransactionBufferSnapshotIndex();
                index.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
                index.setAbortedMarkLedgerID(abortedMarkerPersistentPosition.getLedgerId());
                index.setAbortedMarkEntryID(abortedMarkerPersistentPosition.getEntryId());
                index.setSegmentLedgerID(((MessageIdImpl)messageId).getLedgerId());
                index.setSegmentEntryID(((MessageIdImpl)messageId).getEntryId());
                SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.put((Object)abortedMarkerPersistentPosition, (Object)index);
                return this.updateIndexWhenExecuteTheLatestTask();
            });
        }

        private CompletionStage<Void> updateIndexWhenExecuteTheLatestTask() {
            PositionImpl maxReadPosition = this.topic.getMaxReadPosition();
            List<TxnIDData> aborts = SnapshotSegmentAbortedTxnProcessorImpl.this.convertTypeToTxnIDData(SnapshotSegmentAbortedTxnProcessorImpl.this.unsealedTxnIds);
            if (this.taskQueue.size() != 1) {
                return CompletableFuture.completedFuture(null);
            }
            return this.updateSnapshotIndex(new TransactionBufferSnapshotIndexesMetadata(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId(), aborts));
        }

        private CompletableFuture<Void> deleteSnapshotSegment(List<PositionImpl> positionNeedToDeletes) {
            ArrayList<CompletionStage> results = new ArrayList<CompletionStage>();
            for (PositionImpl positionNeedToDelete : positionNeedToDeletes) {
                long sequenceIdNeedToDelete = ((TransactionBufferSnapshotIndex)SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.get((Object)positionNeedToDelete)).getSequenceID();
                CompletionStage res = ((CompletableFuture)this.snapshotSegmentsWriter.getFuture().thenCompose(writer -> writer.deleteAsync(SnapshotSegmentAbortedTxnProcessorImpl.this.buildKey(sequenceIdNeedToDelete), null))).thenCompose(messageId -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Successes to delete the snapshot segment, whose sequenceId is [{}] and maxReadPosition is [{}]", new Object[]{this.topic.getName(), this.sequenceID, positionNeedToDelete});
                    }
                    SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.remove((Object)positionNeedToDelete);
                    return this.updateIndexWhenExecuteTheLatestTask();
                });
                ((CompletableFuture)res).exceptionally(e -> {
                    log.warn("[{}] Failed to delete the snapshot segment, whose sequenceId is [{}] and maxReadPosition is [{}]", new Object[]{this.topic.getName(), this.sequenceID, positionNeedToDelete, e});
                    return null;
                });
                results.add(res);
            }
            return FutureUtil.waitForAll(results);
        }

        private CompletableFuture<Void> updateSnapshotIndex(TransactionBufferSnapshotIndexesMetadata snapshotSegment) {
            TransactionBufferSnapshotIndexes snapshotIndexes = new TransactionBufferSnapshotIndexes();
            CompletionStage res = this.snapshotIndexWriter.getFuture().thenCompose(indexesWriter -> {
                snapshotIndexes.setIndexList(SnapshotSegmentAbortedTxnProcessorImpl.this.indexes.values().stream().toList());
                snapshotIndexes.setSnapshot(snapshotSegment);
                snapshotIndexes.setTopicName(this.topic.getName());
                return indexesWriter.writeAsync(this.topic.getName(), snapshotIndexes).thenCompose(messageId -> CompletableFuture.completedFuture(null));
            });
            ((CompletableFuture)((CompletableFuture)res).thenRun(() -> {
                SnapshotSegmentAbortedTxnProcessorImpl.this.lastSnapshotTimestamps = System.currentTimeMillis();
            })).exceptionally(e -> {
                log.error("[{}] Failed to update snapshot segment index", (Object)snapshotIndexes.getTopicName(), e);
                return null;
            });
            return res;
        }

        private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
            CompletionStage res = ((CompletableFuture)SnapshotSegmentAbortedTxnProcessorImpl.this.persistentWorker.clearAllSnapshotSegments().thenCompose(ignore -> this.snapshotIndexWriter.getFuture().thenCompose(indexesWriter -> indexesWriter.writeAsync(this.topic.getName(), null)))).thenRun(() -> log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]", (Object)this.topic.getName()));
            ((CompletableFuture)res).exceptionally(e -> {
                log.error("Failed to clear the snapshot segment and indexes for the topic [{}]", (Object)this.topic.getName(), e);
                return null;
            });
            return res;
        }

        private CompletableFuture<Void> clearAllSnapshotSegments() {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            PulsarService pulsar = this.topic.getBrokerService().getPulsar();
            pulsar.getTransactionExecutorProvider().getExecutor((Object)this).execute(() -> {
                try {
                    SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader = this.wait(pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService().createReader(TopicName.get((String)this.topic.getName())), "create reader");
                    try {
                        while (this.wait(reader.hasMoreEventsAsync(), "has more events").booleanValue()) {
                            Message<TransactionBufferSnapshotSegment> message = this.wait(reader.readNextAsync(), "read next");
                            if (!this.topic.getName().equals(((TransactionBufferSnapshotSegment)message.getValue()).getTopicName())) continue;
                            this.snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
                        }
                        future.complete(null);
                    }
                    finally {
                        SnapshotSegmentAbortedTxnProcessorImpl.this.closeReader(reader);
                    }
                }
                catch (Throwable throwable) {
                    future.completeExceptionally(throwable);
                }
            });
            return future;
        }

        private <R> R wait(CompletableFuture<R> future, String msg) throws Exception {
            try {
                return future.get(SnapshotSegmentAbortedTxnProcessorImpl.this.getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                throw new CompletionException("Failed to " + msg, e.getCause());
            }
        }

        synchronized CompletableFuture<Void> closeAsync() {
            if (!this.closed) {
                this.closed = true;
                this.snapshotSegmentsWriter.release();
                this.snapshotIndexWriter.release();
            }
            return CompletableFuture.completedFuture(null);
        }

        private static enum OperationState {
            None,
            Operating,
            Closed;

        }

        public static enum OperationType {
            UpdateIndex,
            WriteSegment,
            DeleteSegment,
            Clear;

        }
    }
}

