/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcessor {
    private static final Logger log = LoggerFactory.getLogger(SingleSnapshotAbortedTxnProcessorImpl.class);
    private final PersistentTopic topic;
    private final SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter<TransactionBufferSnapshot> takeSnapshotWriter;
    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap();
    private volatile long lastSnapshotTimestamps;
    private volatile boolean isClosed = false;

    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
        this.topic = topic;
        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService().getReferenceWriter(TopicName.get((String)topic.getName()).getNamespaceObject());
        this.takeSnapshotWriter.getFuture().exceptionally(ex -> {
            log.error("{} Failed to create snapshot writer", (Object)topic.getName());
            topic.close();
            return null;
        });
    }

    @Override
    public void putAbortedTxnAndPosition(TxnID abortedTxnId, PositionImpl abortedMarkerPersistentPosition) {
        this.aborts.put((Object)abortedTxnId, (Object)abortedMarkerPersistentPosition);
    }

    @Override
    public void trimExpiredAbortedTxns() {
        while (!this.aborts.isEmpty() && !((ManagedLedgerImpl)this.topic.getManagedLedger()).ledgerExists(((PositionImpl)this.aborts.get(this.aborts.firstKey())).getLedgerId())) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Topic transaction buffer clear aborted transaction, TxnId : {}, Position : {}", new Object[]{this.topic.getName(), this.aborts.firstKey(), this.aborts.get(this.aborts.firstKey())});
            }
            this.aborts.remove(this.aborts.firstKey());
        }
    }

    @Override
    public boolean checkAbortedTransaction(TxnID txnID) {
        return this.aborts.containsKey((Object)txnID);
    }

    private long getSystemClientOperationTimeoutMs() throws Exception {
        PulsarClientImpl pulsarClient = (PulsarClientImpl)this.topic.getBrokerService().getPulsar().getClient();
        return pulsarClient.getConfiguration().getOperationTimeoutMs();
    }

    @Override
    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
        return this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService().createReader(TopicName.get((String)this.topic.getName())).thenComposeAsync(reader -> {
            try {
                CompletableFuture<Object> message;
                PositionImpl startReadCursorPosition = null;
                while (reader.hasMoreEvents()) {
                    TransactionBufferSnapshot transactionBufferSnapshot;
                    message = reader.readNextAsync().get(this.getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
                    if (!this.topic.getName().equals(message.getKey()) || (transactionBufferSnapshot = (TransactionBufferSnapshot)message.getValue()) == null) continue;
                    this.handleSnapshot(transactionBufferSnapshot);
                    startReadCursorPosition = PositionImpl.get((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)transactionBufferSnapshot.getMaxReadPositionEntryId());
                }
                message = CompletableFuture.completedFuture(startReadCursorPosition);
                return message;
            }
            catch (TimeoutException ex) {
                Throwable t = FutureUtil.unwrapCompletionException((Throwable)ex);
                String errorMessage = String.format("[%s] Transaction buffer recover fail by read transactionBufferSnapshot timeout!", this.topic.getName());
                log.error(errorMessage, t);
                CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
                return completableFuture;
            }
            catch (Exception ex) {
                log.error("[{}] Transaction buffer recover fail when read transactionBufferSnapshot!", (Object)this.topic.getName(), (Object)ex);
                CompletableFuture completableFuture = FutureUtil.failedFuture((Throwable)ex);
                return completableFuture;
            }
            finally {
                this.closeReader((SystemTopicClient.Reader<TransactionBufferSnapshot>)reader);
            }
        }, (Executor)this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor((Object)this));
    }

    @Override
    public CompletableFuture<Void> clearAbortedTxnSnapshot() {
        return ((CompletableFuture)this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
            snapshot.setTopicName(this.topic.getName());
            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
        })).thenRun(() -> log.info("[{}] Successes to delete the aborted transaction snapshot", (Object)this.topic));
    }

    @Override
    public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosition) {
        return this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
            snapshot.setTopicName(this.topic.getName());
            snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
            snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
            ArrayList<AbortTxnMetadata> list = new ArrayList<AbortTxnMetadata>();
            this.aborts.forEach((k, v) -> {
                AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
                abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
                abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
                abortTxnMetadata.setLedgerId(v.getLedgerId());
                abortTxnMetadata.setEntryId(v.getEntryId());
                list.add(abortTxnMetadata);
            });
            snapshot.setAborts(list);
            return ((CompletableFuture)writer.writeAsync(snapshot.getTopicName(), snapshot).thenAccept(messageId -> {
                this.lastSnapshotTimestamps = System.currentTimeMillis();
                if (log.isDebugEnabled()) {
                    log.debug("[{}]Transaction buffer take snapshot success! messageId : {}", (Object)this.topic.getName(), messageId);
                }
            })).exceptionally(e -> {
                log.warn("[{}]Transaction buffer take snapshot fail! ", (Object)this.topic.getName(), (Object)e.getCause());
                return null;
            });
        });
    }

    @Override
    public long getLastSnapshotTimestamps() {
        return this.lastSnapshotTimestamps;
    }

    @Override
    public synchronized CompletableFuture<Void> closeAsync() {
        if (!this.isClosed) {
            this.isClosed = true;
            this.takeSnapshotWriter.release();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
        reader.closeAsync().exceptionally(e -> {
            log.error("[{}]Transaction buffer reader close error!", (Object)this.topic.getName(), e);
            return null;
        });
    }

    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
        if (snapshot.getAborts() != null) {
            snapshot.getAborts().forEach(abortTxnMetadata -> this.aborts.put((Object)new TxnID(abortTxnMetadata.getTxnIdMostBits(), abortTxnMetadata.getTxnIdLeastBits()), (Object)PositionImpl.get((long)abortTxnMetadata.getLedgerId(), (long)abortTxnMetadata.getEntryId())));
        }
    }
}

