/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.offload.impl;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.pulsar.broker.offload.BackedInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.broker.offload.OffloadIndexEntry;
import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl;
import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BackedReadHandleImpl
implements ReadHandle {
    private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
    private final long ledgerId;
    private final OffloadIndexBlock index;
    private final BackedInputStream inputStream;
    private final DataInputStream dataStream;
    private final ExecutorService executor;

    private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor) {
        this.ledgerId = ledgerId;
        this.index = index;
        this.inputStream = inputStream;
        this.dataStream = new DataInputStream(inputStream);
        this.executor = executor;
    }

    public long getId() {
        return this.ledgerId;
    }

    public LedgerMetadata getLedgerMetadata() {
        return this.index.getLedgerMetadata();
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.executor.submit(() -> {
            try {
                this.index.close();
                this.inputStream.close();
                promise.complete(null);
            }
            catch (IOException t) {
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
        log.debug("Ledger {}: reading {} - {}", new Object[]{this.getId(), firstEntry, lastEntry});
        CompletableFuture<LedgerEntries> promise = new CompletableFuture<LedgerEntries>();
        this.executor.submit(() -> {
            if (firstEntry > lastEntry || firstEntry < 0L || lastEntry > this.getLastAddConfirmed()) {
                promise.completeExceptionally((Throwable)new BKException.BKIncorrectParameterException());
                return;
            }
            long entriesToRead = lastEntry - firstEntry + 1L;
            ArrayList<LedgerEntryImpl> entries = new ArrayList<LedgerEntryImpl>();
            long nextExpectedId = firstEntry;
            try {
                OffloadIndexEntry entry = this.index.getIndexEntryForEntry(firstEntry);
                this.inputStream.seek(entry.getDataOffset());
                while (entriesToRead > 0L) {
                    long entryId;
                    int length = this.dataStream.readInt();
                    if (length < 0) {
                        this.inputStream.seekForward(this.index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
                        length = this.dataStream.readInt();
                    }
                    if ((entryId = this.dataStream.readLong()) == nextExpectedId) {
                        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
                        entries.add(LedgerEntryImpl.create((long)this.ledgerId, (long)entryId, (long)length, (ByteBuf)buf));
                        int toWrite = length;
                        while (toWrite > 0) {
                            toWrite -= buf.writeBytes((InputStream)this.dataStream, toWrite);
                        }
                        --entriesToRead;
                        ++nextExpectedId;
                        continue;
                    }
                    if (entryId > lastEntry) {
                        log.info("Expected to read {}, but read {}, which is greater than last entry {}", new Object[]{nextExpectedId, entryId, lastEntry});
                        throw new BKException.BKUnexpectedConditionException();
                    }
                    this.inputStream.skip(length);
                }
                promise.complete((LedgerEntries)LedgerEntriesImpl.create(entries));
            }
            catch (Throwable t) {
                promise.completeExceptionally(t);
                entries.forEach(LedgerEntry::close);
            }
        });
        return promise;
    }

    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
        return this.readAsync(firstEntry, lastEntry);
    }

    public CompletableFuture<Long> readLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public long getLastAddConfirmed() {
        return this.getLedgerMetadata().getLastEntryId();
    }

    public long getLength() {
        return this.getLedgerMetadata().getLength();
    }

    public boolean isClosed() {
        return this.getLedgerMetadata().isClosed();
    }

    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) {
        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<LastConfirmedAndEntry>();
        promise.completeExceptionally(new UnsupportedOperationException());
        return promise;
    }

    public static ReadHandle open(ScheduledExecutorService executor, AmazonS3 s3client, String bucket, String key, String indexKey, S3ManagedLedgerOffloader.VersionCheck versionCheck, long ledgerId, int readBufferSize) throws AmazonClientException, IOException {
        GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
        Throwable throwable = null;
        Object var11_11 = null;
        try (S3Object obj = s3client.getObject(req);){
            versionCheck.check(indexKey, obj.getObjectMetadata());
            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
            OffloadIndexBlock index = indexBuilder.fromStream((InputStream)obj.getObjectContent());
            S3BackedInputStreamImpl inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize);
            return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
        }
        catch (Throwable throwable2) {
            if (throwable == null) {
                throwable = throwable2;
            } else if (throwable != throwable2) {
                throwable.addSuppressed(throwable2);
            }
            throw throwable;
        }
    }
}

