/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotPersistenceException;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotSerializationException;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookkeeperBucketSnapshotStorage
implements BucketSnapshotStorage {
    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
    private static final byte[] LedgerPassword = "".getBytes();
    private final PulsarService pulsar;
    private final ServiceConfiguration config;
    private BookKeeper bookKeeper;

    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.config = pulsar.getConfig();
    }

    @Override
    public CompletableFuture<Long> createBucketSnapshot(DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata, List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments, String bucketKey, String topicName, String cursorName) {
        return this.createLedger(bucketKey, topicName, cursorName).thenCompose(ledgerHandle -> ((CompletableFuture)((CompletableFuture)this.addEntry((LedgerHandle)ledgerHandle, snapshotMetadata.toByteArray()).thenCompose(__ -> this.addSnapshotSegments((LedgerHandle)ledgerHandle, bucketSnapshotSegments))).thenCompose(__ -> this.closeLedger((LedgerHandle)ledgerHandle))).thenApply(__ -> ledgerHandle.getId()));
    }

    @Override
    public CompletableFuture<DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
        return this.openLedger(bucketId).thenCompose(ledgerHandle -> this.getLedgerEntry((LedgerHandle)ledgerHandle, 0L, 0L).thenApply(entryEnumeration -> this.parseSnapshotMetadataEntry((LedgerEntry)entryEnumeration.nextElement())));
    }

    @Override
    public CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) {
        return this.openLedger(bucketId).thenCompose(ledgerHandle -> this.getLedgerEntry((LedgerHandle)ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
    }

    @Override
    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
        return this.openLedger(bucketId).thenApply(LedgerHandle::getLength);
    }

    @Override
    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
        return this.deleteLedger(bucketId);
    }

    @Override
    public void start() throws Exception {
        this.bookKeeper = this.pulsar.getBookKeeperClientFactory().create(this.pulsar.getConfiguration(), this.pulsar.getLocalMetadataStore(), this.pulsar.getIoEventLoopGroup(), Optional.empty(), null);
    }

    @Override
    public void close() throws Exception {
        if (this.bookKeeper != null) {
            this.bookKeeper.close();
        }
    }

    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle, List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
        ArrayList<CompletableFuture<Void>> addFutures = new ArrayList<CompletableFuture<Void>>();
        for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
            addFutures.add(this.addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
        }
        return FutureUtil.waitForAll(addFutures);
    }

    private DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
        try {
            return DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
        }
        catch (InvalidProtocolBufferException e) {
            throw new BucketSnapshotSerializationException(e);
        }
    }

    private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
        ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotMetadataList = new ArrayList<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>();
        try {
            while (entryEnumeration.hasMoreElements()) {
                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
                snapshotMetadataList.add(DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
            }
            return snapshotMetadataList;
        }
        catch (IOException e) {
            throw new BucketSnapshotSerializationException(e);
        }
    }

    @NotNull
    private CompletableFuture<LedgerHandle> createLedger(String bucketKey, String topicName, String cursorName) {
        CompletableFuture<LedgerHandle> future = new CompletableFuture<LedgerHandle>();
        Map metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket((String)bucketKey, (String)topicName, (String)cursorName);
        this.bookKeeper.asyncCreateLedger(this.config.getManagedLedgerDefaultEnsembleSize(), this.config.getManagedLedgerDefaultWriteQuorum(), this.config.getManagedLedgerDefaultAckQuorum(), BookKeeper.DigestType.fromApiDigestType((DigestType)this.config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> {
            if (rc != 0) {
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Create ledger", rc, -1L));
            } else {
                future.complete(handle);
            }
        }, null, metadata);
        return future;
    }

    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
        CompletableFuture<LedgerHandle> future = new CompletableFuture<LedgerHandle>();
        this.bookKeeper.asyncOpenLedger(ledgerId.longValue(), BookKeeper.DigestType.fromApiDigestType((DigestType)this.config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> {
            if (rc != 0) {
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Open ledger", rc, ledgerId));
            } else {
                future.complete(handle);
            }
        }, null);
        return future;
    }

    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ledgerHandle.asyncClose((rc, handle, ctx) -> {
            if (rc != 0) {
                log.warn("Failed to close a Ledger Handle: {}", (Object)ledgerHandle.getId());
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Close ledger", rc, ledgerHandle.getId()));
            } else {
                future.complete(null);
            }
        }, null);
        return future;
    }

    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
        CompletableFuture future = new CompletableFuture();
        ledgerHandle.asyncAddEntry(data, (rc, handle, entryId, ctx) -> {
            if (rc != 0) {
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Add entry", rc, ledgerHandle.getId()));
            } else {
                future.complete(null);
            }
        }, null);
        return future.whenComplete((__, ex) -> {
            if (ex != null) {
                this.deleteLedger(ledgerHandle.getId());
            }
        });
    }

    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger, long firstEntryId, long lastEntryId) {
        CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<Enumeration<LedgerEntry>>();
        ledger.asyncReadEntries(firstEntryId, lastEntryId, (rc, handle, entries, ctx) -> {
            if (rc != 0) {
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Read entry", rc, ledger.getId()));
            } else {
                future.complete(entries);
            }
        }, null);
        return future;
    }

    private CompletableFuture<Void> deleteLedger(long ledgerId) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.bookKeeper.asyncDeleteLedger(ledgerId, (rc, cnx) -> {
            if (rc != 0) {
                future.completeExceptionally(BookkeeperBucketSnapshotStorage.bkException("Delete ledger", rc, ledgerId));
            } else {
                future.complete(null);
            }
        }, null);
        return future;
    }

    private static BucketSnapshotPersistenceException bkException(String operation, int rc, long ledgerId) {
        String message = BKException.getMessage((int)rc) + " -  ledger=" + ledgerId + " - operation=" + operation;
        return new BucketSnapshotPersistenceException(message);
    }
}

