/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactedTopicImpl
implements CompactedTopic {
    static final long NEWER_THAN_COMPACTED = -4276948922L;
    static final long COMPACT_LEDGER_EMPTY = -4276948923L;
    static final int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
    private final BookKeeper bk;
    private PositionImpl compactionHorizon = null;
    private CompletableFuture<CompactedTopicContext> compactedTopicContext = null;
    private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);

    public CompactedTopicImpl(BookKeeper bk) {
        this.bk = bk;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
        CompactedTopicImpl compactedTopicImpl = this;
        synchronized (compactedTopicImpl) {
            this.compactionHorizon = (PositionImpl)p;
            CompletableFuture<CompactedTopicContext> previousContext = this.compactedTopicContext;
            this.compactedTopicContext = CompactedTopicImpl.openCompactedLedger(this.bk, compactedLedgerId);
            if (previousContext != null) {
                return ((CompletableFuture)this.compactedTopicContext.thenCompose(res -> previousContext)).thenCompose(res -> CompactedTopicImpl.tryDeleteCompactedLedger(this.bk, res.ledger.getId()));
            }
            return this.compactedTopicContext;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        CompactedTopicImpl compactedTopicImpl = this;
        synchronized (compactedTopicImpl) {
            PositionImpl cursorPosition = (PositionImpl)cursor.getReadPosition();
            if (this.compactionHorizon == null || this.compactionHorizon.compareTo(cursorPosition) < 0) {
                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
            } else {
                ((CompletableFuture)this.compactedTopicContext.thenCompose(context -> CompactedTopicImpl.findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache).thenCompose(startPoint -> {
                    if (startPoint == -4276948923L) {
                        cursor.seek((Position)this.compactionHorizon.getNext());
                        callback.readEntriesComplete(Collections.emptyList(), ctx);
                        return CompletableFuture.completedFuture(null);
                    }
                    if (startPoint == -4276948922L && this.compactionHorizon.compareTo(cursorPosition) < 0) {
                        cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
                        return CompletableFuture.completedFuture(null);
                    }
                    long endPoint = Math.min(context.ledger.getLastAddConfirmed(), startPoint + (long)numberOfEntriesToRead);
                    if (startPoint == -4276948922L) {
                        cursor.seek((Position)this.compactionHorizon.getNext());
                        callback.readEntriesComplete(Collections.emptyList(), ctx);
                        return CompletableFuture.completedFuture(null);
                    }
                    return CompactedTopicImpl.readEntries(context.ledger, startPoint, endPoint).thenAccept(entries -> {
                        Entry lastEntry = (Entry)entries.get(entries.size() - 1);
                        cursor.seek(lastEntry.getPosition().getNext());
                        callback.readEntriesComplete(entries, ctx);
                    });
                }))).exceptionally(exception -> {
                    if (exception.getCause() instanceof NoSuchElementException) {
                        cursor.seek((Position)this.compactionHorizon.getNext());
                        callback.readEntriesComplete(Collections.emptyList(), ctx);
                    } else {
                        callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
                    }
                    return null;
                });
            }
        }
    }

    static CompletableFuture<Long> findStartPoint(PositionImpl p, long lastEntryId, AsyncLoadingCache<Long, PulsarApi.MessageIdData> cache) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        if (lastEntryId < 0L) {
            promise.complete(-4276948923L);
        } else {
            CompactedTopicImpl.findStartPointLoop(p, 0L, lastEntryId, promise, cache);
        }
        return promise;
    }

    private static void findStartPointLoop(PositionImpl p, long start, long end, CompletableFuture<Long> promise, AsyncLoadingCache<Long, PulsarApi.MessageIdData> cache) {
        long midpoint = start + (end - start) / 2L;
        CompletableFuture startEntry = cache.get((Object)start);
        CompletableFuture middleEntry = cache.get((Object)midpoint);
        CompletableFuture endEntry = cache.get((Object)end);
        ((CompletableFuture)CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(() -> {
            if (CompactedTopicImpl.comparePositionAndMessageId(p, (PulsarApi.MessageIdData)startEntry.join()) <= 0) {
                promise.complete(start);
            } else if (CompactedTopicImpl.comparePositionAndMessageId(p, (PulsarApi.MessageIdData)middleEntry.join()) <= 0) {
                CompactedTopicImpl.findStartPointLoop(p, start, midpoint, promise, cache);
            } else if (CompactedTopicImpl.comparePositionAndMessageId(p, (PulsarApi.MessageIdData)endEntry.join()) <= 0) {
                CompactedTopicImpl.findStartPointLoop(p, midpoint + 1L, end, promise, cache);
            } else {
                promise.complete(-4276948922L);
            }
        })).exceptionally(exception -> {
            promise.completeExceptionally((Throwable)exception);
            return null;
        });
    }

    static AsyncLoadingCache<Long, PulsarApi.MessageIdData> createCache(LedgerHandle lh, long maxSize) {
        return Caffeine.newBuilder().maximumSize(maxSize).buildAsync((entryId, executor) -> CompactedTopicImpl.readOneMessageId(lh, entryId));
    }

    private static CompletableFuture<PulsarApi.MessageIdData> readOneMessageId(LedgerHandle lh, long entryId) {
        CompletableFuture<PulsarApi.MessageIdData> promise = new CompletableFuture<PulsarApi.MessageIdData>();
        lh.asyncReadEntries(entryId, entryId, (rc, _lh, seq, ctx) -> {
            if (rc != 0) {
                promise.completeExceptionally(BKException.create((int)rc));
            } else if (seq.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)seq.nextElement();
                try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer());){
                    entry.getEntryBuffer().release();
                    while (seq.hasMoreElements()) {
                        ((LedgerEntry)seq.nextElement()).getEntryBuffer().release();
                    }
                    promise.complete(m.getMessageIdData());
                }
            } else {
                promise.completeExceptionally(new NoSuchElementException(String.format("No such entry %d in ledger %d", entryId, lh.getId())));
            }
        }, null);
        return promise;
    }

    private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) {
        CompletableFuture promise = new CompletableFuture();
        bk.asyncOpenLedger(id, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, (rc, ledger, ctx) -> {
            if (rc != 0) {
                promise.completeExceptionally(BKException.create((int)rc));
            } else {
                promise.complete(ledger);
            }
        }, null);
        return promise.thenApply(ledger -> new CompactedTopicContext((LedgerHandle)ledger, CompactedTopicImpl.createCache(ledger, 100L)));
    }

    private static CompletableFuture<Void> tryDeleteCompactedLedger(BookKeeper bk, long id) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        bk.asyncDeleteLedger(id, (rc, ctx) -> {
            if (rc != 0) {
                log.warn("Error deleting compacted topic ledger {}", (Object)id, (Object)BKException.create((int)rc));
            } else {
                log.debug("Compacted topic ledger deleted successfully");
            }
            promise.complete(null);
        }, null);
        return promise;
    }

    private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
        CompletableFuture promise = new CompletableFuture();
        lh.asyncReadEntries(from, to, (rc, _lh, seq, ctx) -> {
            if (rc != 0) {
                promise.completeExceptionally(BKException.create((int)rc));
            } else {
                promise.complete(seq);
            }
        }, null);
        return promise.thenApply(seq -> {
            ArrayList<EntryImpl> entries = new ArrayList<EntryImpl>();
            while (seq.hasMoreElements()) {
                ByteBuf buf = ((LedgerEntry)seq.nextElement()).getEntryBuffer();
                try {
                    RawMessage m = RawMessageImpl.deserializeFrom(buf);
                    Throwable throwable = null;
                    try {
                        entries.add(EntryImpl.create((long)m.getMessageIdData().getLedgerId(), (long)m.getMessageIdData().getEntryId(), (ByteBuf)m.getHeadersAndPayload()));
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (m == null) continue;
                        if (throwable != null) {
                            try {
                                m.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        m.close();
                    }
                }
                finally {
                    buf.release();
                }
            }
            return entries;
        });
    }

    public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
        return this.compactedTopicContext == null ? Optional.empty() : Optional.of(this.compactedTopicContext.get());
    }

    private static int comparePositionAndMessageId(PositionImpl p, PulsarApi.MessageIdData m) {
        return ComparisonChain.start().compare(p.getLedgerId(), m.getLedgerId()).compare(p.getEntryId(), m.getEntryId()).result();
    }

    public static class CompactedTopicContext {
        final LedgerHandle ledger;
        final AsyncLoadingCache<Long, PulsarApi.MessageIdData> cache;

        CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long, PulsarApi.MessageIdData> cache) {
            this.ledger = ledger;
            this.cache = cache;
        }

        public LedgerHandle getLedger() {
            return this.ledger;
        }

        public AsyncLoadingCache<Long, PulsarApi.MessageIdData> getCache() {
            return this.cache;
        }
    }
}

