/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.transaction.TxnID;

public class InMemTransactionBufferReader
implements TransactionBufferReader {
    private final TxnID txnId;
    private final Iterator<Map.Entry<Long, ByteBuf>> entries;
    private final long committedAtLedgerId;
    private final long committedAtEntryId;

    public InMemTransactionBufferReader(TxnID txnId, Iterator<Map.Entry<Long, ByteBuf>> entries, long committedAtLedgerId, long committedAtEntryId) {
        this.txnId = txnId;
        this.entries = entries;
        this.committedAtLedgerId = committedAtLedgerId;
        this.committedAtEntryId = committedAtEntryId;
    }

    @Override
    public synchronized CompletableFuture<List<TransactionEntry>> readNext(int numEntries) {
        CompletableFuture<List<TransactionEntry>> readFuture = new CompletableFuture<List<TransactionEntry>>();
        if (numEntries <= 0) {
            readFuture.completeExceptionally(new IllegalArgumentException("`numEntries` should be larger than 0"));
            return readFuture;
        }
        ArrayList<TransactionEntryImpl> txnEntries = new ArrayList<TransactionEntryImpl>(numEntries);
        for (int i = 0; i < numEntries && this.entries.hasNext(); ++i) {
            Map.Entry<Long, ByteBuf> entry = this.entries.next();
            TransactionEntryImpl txnEntry = new TransactionEntryImpl(this.txnId, entry.getKey(), (Entry)EntryImpl.create((long)-1L, (long)-1L, (ByteBuf)entry.getValue()), this.committedAtLedgerId, this.committedAtEntryId, -1);
            txnEntries.add(txnEntry);
        }
        if (txnEntries.isEmpty()) {
            readFuture.completeExceptionally(new TransactionBufferException.EndOfTransactionException("No more entries found in transaction `" + this.txnId + "`"));
        } else {
            readFuture.complete(txnEntries);
        }
        return readFuture;
    }

    @Override
    public synchronized void close() {
        while (this.entries.hasNext()) {
            Map.Entry<Long, ByteBuf> entry = this.entries.next();
            entry.getValue().release();
        }
    }
}

