/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTransactionBufferReader
implements TransactionBufferReader {
    private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBufferReader.class);
    static final long DEFAULT_START_SEQUENCE_ID = -1L;
    private final ManagedLedger ledger;
    private final TransactionMeta meta;
    private volatile long currentSequenceId = -1L;

    PersistentTransactionBufferReader(TransactionMeta meta, ManagedLedger ledger) throws TransactionNotSealedException {
        if (TxnStatus.OPEN == meta.status()) {
            throw new TransactionNotSealedException("Transaction `" + meta.id() + "` is not sealed yet");
        }
        this.meta = meta;
        this.ledger = ledger;
    }

    @Override
    public CompletableFuture<List<TransactionEntry>> readNext(int numEntries) {
        return ((CompletableFuture)this.meta.readEntries(numEntries, this.currentSequenceId).thenCompose(this::readEntry)).thenApply(entries -> entries.stream().sorted(Comparator.comparingLong(entry -> entry.sequenceId())).collect(Collectors.toList()));
    }

    private CompletableFuture<List<TransactionEntry>> readEntry(SortedMap<Long, Position> entries) {
        CompletableFuture<List<TransactionEntry>> readFuture = new CompletableFuture<List<TransactionEntry>>();
        ArrayList txnEntries = new ArrayList(entries.size());
        ArrayList futures = new ArrayList();
        for (Map.Entry<Long, Position> longPositionEntry : entries.entrySet()) {
            CompletableFuture tmpFuture = new CompletableFuture();
            this.readEntry(longPositionEntry.getValue()).whenComplete((entry, throwable) -> {
                if (null != throwable) {
                    tmpFuture.completeExceptionally((Throwable)throwable);
                } else {
                    TransactionEntryImpl txnEntry = new TransactionEntryImpl(this.meta.id(), (Long)longPositionEntry.getKey(), entry.getDataBuffer(), this.meta.committedAtLedgerId(), this.meta.committedAtEntryId());
                    List list = txnEntries;
                    synchronized (list) {
                        txnEntries.add(txnEntry);
                    }
                    tmpFuture.complete(null);
                }
            });
            futures.add(tmpFuture);
        }
        FutureUtil.waitForAll(futures).whenComplete((ignore, error) -> {
            if (error != null) {
                readFuture.completeExceptionally((Throwable)error);
            } else {
                this.currentSequenceId = (Long)entries.lastKey();
                readFuture.complete(txnEntries);
            }
        });
        return readFuture;
    }

    private CompletableFuture<Entry> readEntry(Position position) {
        final CompletableFuture<Entry> readFuture = new CompletableFuture<Entry>();
        ManagedLedgerImpl readLedger = (ManagedLedgerImpl)this.ledger;
        readLedger.asyncReadEntry((PositionImpl)position, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                readFuture.complete(entry);
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                readFuture.completeExceptionally((Throwable)exception);
            }
        }, null);
        return readFuture;
    }

    @Override
    public void close() {
        log.info("Txn {} reader closed.", (Object)this.meta.id());
    }
}

