/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl.cache;

import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingReadsManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PendingReadsManager.class);
    private static final Counter COUNT_ENTRIES_READ_FROM_BK = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_entries_read")).help("Total number of entries read from BK")).register();
    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_entries_notread")).help("Total number of entries not read from BK")).register();
    private static final Counter COUNT_PENDING_READS_MATCHED = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_matched")).help("Pending reads reused with perfect range match")).register();
    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_matched_included")).help("Pending reads reused by attaching to a read with a larger range")).register();
    private static final Counter COUNT_PENDING_READS_MISSED = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_missed")).help("Pending reads that didn't find a match")).register();
    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")).help("Pending reads that didn't find a match but they partially overlap with another read")).register();
    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")).help("Pending reads that didn't find a match but they partially overlap with another read")).register();
    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = (Counter)((Counter.Builder)((Counter.Builder)Counter.build().name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")).help("Pending reads that didn't find a match but they partially overlap with another read")).register();
    private final RangeEntryCacheImpl rangeEntryCache;
    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads = new ConcurrentHashMap();

    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
        this.rangeEntryCache = rangeEntryCache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey, PendingRead> ledgerCache, AtomicBoolean created) {
        Map<PendingReadKey, PendingRead> map = ledgerCache;
        synchronized (map) {
            PendingRead existing = ledgerCache.get(key);
            if (existing != null) {
                COUNT_PENDING_READS_MATCHED.inc((double)key.size());
                COUNT_ENTRIES_NOTREAD_FROM_BK.inc((double)key.size());
                return new FindPendingReadOutcome(existing, null, null);
            }
            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
                PendingReadKey entryKey = entry.getKey();
                if (entryKey.includes(key)) {
                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc((double)key.size());
                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc((double)key.size());
                    return new FindPendingReadOutcome(entry.getValue(), null, null);
                }
                if (!entryKey.overlaps(key)) continue;
                PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
                PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
                if (reminderOnLeft != null && reminderOnRight != null) {
                    foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(), reminderOnLeft, reminderOnRight);
                    continue;
                }
                if (reminderOnRight != null && reminderOnLeft == null) {
                    foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(), null, reminderOnRight);
                    continue;
                }
                if (reminderOnLeft == null || reminderOnRight != null) continue;
                foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(), reminderOnLeft, null);
            }
            if (foundButMissingSomethingOnRight != null) {
                long delta = key.size() - foundButMissingSomethingOnRight.missingOnRight.size();
                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc((double)delta);
                COUNT_ENTRIES_NOTREAD_FROM_BK.inc((double)delta);
                return foundButMissingSomethingOnRight;
            }
            if (foundButMissingSomethingOnLeft != null) {
                long delta = key.size() - foundButMissingSomethingOnLeft.missingOnLeft.size();
                COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc((double)delta);
                COUNT_ENTRIES_NOTREAD_FROM_BK.inc((double)delta);
                return foundButMissingSomethingOnLeft;
            }
            if (foundButMissingSomethingOnBoth != null) {
                long delta = key.size() - foundButMissingSomethingOnBoth.missingOnRight.size() - foundButMissingSomethingOnBoth.missingOnLeft.size();
                COUNT_ENTRIES_NOTREAD_FROM_BK.inc((double)delta);
                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc((double)delta);
                return foundButMissingSomethingOnBoth;
            }
            created.set(true);
            PendingRead newRead = new PendingRead(key, ledgerCache);
            ledgerCache.put(key, newRead);
            long delta = key.size();
            COUNT_PENDING_READS_MISSED.inc((double)delta);
            COUNT_ENTRIES_READ_FROM_BK.inc((double)delta);
            return new FindPendingReadOutcome(newRead, null, null);
        }
    }

    void readEntries(final ReadHandle lh, long firstEntry, long lastEntry, final boolean shouldCacheEntry, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
        Map pendingReadsForLedger = this.cachedPendingReads.computeIfAbsent(lh.getId(), l -> new ConcurrentHashMap());
        boolean listenerAdded = false;
        while (!listenerAdded) {
            AtomicBoolean createdByThisThread = new AtomicBoolean();
            final FindPendingReadOutcome findBestCandidateOutcome = this.findPendingRead(key, pendingReadsForLedger, createdByThisThread);
            PendingRead pendingRead = findBestCandidateOutcome.pendingRead;
            if (findBestCandidateOutcome.needsAdditionalReads()) {
                AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback(){

                    @Override
                    public void readEntriesComplete(final List<Entry> entries, final Object ctx) {
                        PendingReadKey missingOnLeft = findBestCandidateOutcome.missingOnLeft;
                        final PendingReadKey missingOnRight = findBestCandidateOutcome.missingOnRight;
                        if (missingOnRight != null && missingOnLeft != null) {
                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback = new AsyncCallbacks.ReadEntriesCallback(){

                                @Override
                                public void readEntriesComplete(final List<Entry> entriesFromLeft, Object dummyCtx1) {
                                    AsyncCallbacks.ReadEntriesCallback readFromRightCallback = new AsyncCallbacks.ReadEntriesCallback(){

                                        @Override
                                        public void readEntriesComplete(List<Entry> entriesFromRight, Object dummyCtx2) {
                                            ArrayList<Entry> finalResult = new ArrayList<Entry>(entriesFromLeft.size() + entries.size() + entriesFromRight.size());
                                            finalResult.addAll(entriesFromLeft);
                                            finalResult.addAll(entries);
                                            finalResult.addAll(entriesFromRight);
                                            callback.readEntriesComplete(finalResult, ctx);
                                        }

                                        @Override
                                        public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx3) {
                                            entries.forEach(Entry::release);
                                            entriesFromLeft.forEach(Entry::release);
                                            callback.readEntriesFailed(exception, ctx);
                                        }
                                    };
                                    PendingReadsManager.this.rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry, shouldCacheEntry, readFromRightCallback, null);
                                }

                                @Override
                                public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) {
                                    entries.forEach(Entry::release);
                                    callback.readEntriesFailed(exception, ctx);
                                }
                            };
                            PendingReadsManager.this.rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, shouldCacheEntry, readFromLeftCallback, null);
                        } else if (missingOnLeft != null) {
                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback = new AsyncCallbacks.ReadEntriesCallback(){

                                @Override
                                public void readEntriesComplete(List<Entry> entriesFromLeft, Object dummyCtx5) {
                                    ArrayList<Entry> finalResult = new ArrayList<Entry>(entriesFromLeft.size() + entries.size());
                                    finalResult.addAll(entriesFromLeft);
                                    finalResult.addAll(entries);
                                    callback.readEntriesComplete(finalResult, ctx);
                                }

                                @Override
                                public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx6) {
                                    entries.forEach(Entry::release);
                                    callback.readEntriesFailed(exception, ctx);
                                }
                            };
                            PendingReadsManager.this.rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, shouldCacheEntry, readFromLeftCallback, null);
                        } else if (missingOnRight != null) {
                            AsyncCallbacks.ReadEntriesCallback readFromRightCallback = new AsyncCallbacks.ReadEntriesCallback(){

                                @Override
                                public void readEntriesComplete(List<Entry> entriesFromRight, Object dummyCtx7) {
                                    ArrayList<Entry> finalResult = new ArrayList<Entry>(entriesFromRight.size() + entries.size());
                                    finalResult.addAll(entries);
                                    finalResult.addAll(entriesFromRight);
                                    callback.readEntriesComplete(finalResult, ctx);
                                }

                                @Override
                                public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx8) {
                                    entries.forEach(Entry::release);
                                    callback.readEntriesFailed(exception, ctx);
                                }
                            };
                            PendingReadsManager.this.rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry, shouldCacheEntry, readFromRightCallback, null);
                        }
                    }

                    @Override
                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                        callback.readEntriesFailed(exception, ctx);
                    }
                };
                listenerAdded = pendingRead.addListener(wrappedCallback, ctx, key.startEntry, key.endEntry);
            } else {
                listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry);
            }
            if (!createdByThisThread.get()) continue;
            CompletableFuture<List<EntryImpl>> readResult = this.rangeEntryCache.readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry);
            pendingRead.attach(readResult);
        }
    }

    void clear() {
        this.cachedPendingReads.clear();
    }

    void invalidateLedger(long id) {
        this.cachedPendingReads.remove(id);
    }

    private class PendingRead {
        final PendingReadKey key;
        final Map<PendingReadKey, PendingRead> ledgerCache;
        final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<ReadEntriesCallbackWithContext>(1);
        boolean completed = false;

        public PendingRead(PendingReadKey key, Map<PendingReadKey, PendingRead> ledgerCache) {
            this.key = key;
            this.ledgerCache = ledgerCache;
        }

        private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
            ArrayList<EntryImpl> result = new ArrayList<EntryImpl>((int)(endEntry - startEntry));
            for (EntryImpl entry : list) {
                long entryId = entry.getEntryId();
                if (startEntry <= entryId && entryId <= endEntry) {
                    result.add(entry);
                    continue;
                }
                entry.release();
            }
            return result;
        }

        public void attach(CompletableFuture<List<EntryImpl>> handle) {
            handle.whenComplete((___, error) -> {
                PendingRead pendingRead = this;
                synchronized (pendingRead) {
                    this.completed = true;
                    Map<PendingReadKey, PendingRead> map = this.ledgerCache;
                    synchronized (map) {
                        this.ledgerCache.remove(this.key, this);
                    }
                }
            });
            ((CompletableFuture)handle.thenAcceptAsync(entriesToReturn -> {
                PendingRead pendingRead = this;
                synchronized (pendingRead) {
                    if (this.callbacks.size() == 1) {
                        ReadEntriesCallbackWithContext first = this.callbacks.get(0);
                        if (first.startEntry == this.key.startEntry && first.endEntry == this.key.endEntry) {
                            first.callback.readEntriesComplete((List<Entry>)entriesToReturn, first.ctx);
                        } else {
                            first.callback.readEntriesComplete(this.keepEntries((List<EntryImpl>)entriesToReturn, first.startEntry, first.endEntry), first.ctx);
                        }
                    } else {
                        for (ReadEntriesCallbackWithContext callback : this.callbacks) {
                            long callbackStartEntry = callback.startEntry;
                            long callbackEndEntry = callback.endEntry;
                            ArrayList<Entry> copy = new ArrayList<Entry>((int)(callbackEndEntry - callbackStartEntry + 1L));
                            for (EntryImpl entry : entriesToReturn) {
                                long entryId = entry.getEntryId();
                                if (callbackStartEntry > entryId || entryId > callbackEndEntry) continue;
                                EntryImpl entryCopy = EntryImpl.create(entry);
                                copy.add(entryCopy);
                            }
                            callback.callback.readEntriesComplete(copy, callback.ctx);
                        }
                        for (EntryImpl entry : entriesToReturn) {
                            entry.release();
                        }
                    }
                }
            }, PendingReadsManager.this.rangeEntryCache.getManagedLedger().getExecutor())).exceptionally(exception -> {
                PendingRead pendingRead = this;
                synchronized (pendingRead) {
                    for (ReadEntriesCallbackWithContext callback : this.callbacks) {
                        ManagedLedgerException mlException = ManagedLedgerImpl.createManagedLedgerException(exception);
                        callback.callback.readEntriesFailed(mlException, callback.ctx);
                    }
                }
                return null;
            });
        }

        synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback, Object ctx, long startEntry, long endEntry) {
            if (this.completed) {
                return false;
            }
            this.callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
            return true;
        }
    }

    private static final class PendingReadKey {
        private final long startEntry;
        private final long endEntry;

        long size() {
            return this.endEntry - this.startEntry + 1L;
        }

        boolean includes(PendingReadKey other) {
            return this.startEntry <= other.startEntry && other.endEntry <= this.endEntry;
        }

        boolean overlaps(PendingReadKey other) {
            return other.startEntry <= this.startEntry && this.startEntry <= other.endEntry || other.startEntry <= this.endEntry && this.endEntry <= other.endEntry;
        }

        PendingReadKey reminderOnLeft(PendingReadKey other) {
            if (other.startEntry <= this.endEntry && other.startEntry > this.startEntry) {
                return new PendingReadKey(this.startEntry, other.startEntry - 1L);
            }
            return null;
        }

        PendingReadKey reminderOnRight(PendingReadKey other) {
            if (this.startEntry <= other.endEntry && other.endEntry < this.endEntry) {
                return new PendingReadKey(other.endEntry + 1L, this.endEntry);
            }
            return null;
        }

        @Generated
        public PendingReadKey(long startEntry, long endEntry) {
            this.startEntry = startEntry;
            this.endEntry = endEntry;
        }

        @Generated
        public long getStartEntry() {
            return this.startEntry;
        }

        @Generated
        public long getEndEntry() {
            return this.endEntry;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PendingReadKey)) {
                return false;
            }
            PendingReadKey other = (PendingReadKey)o;
            if (this.getStartEntry() != other.getStartEntry()) {
                return false;
            }
            return this.getEndEntry() == other.getEndEntry();
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $startEntry = this.getStartEntry();
            result = result * 59 + (int)($startEntry >>> 32 ^ $startEntry);
            long $endEntry = this.getEndEntry();
            result = result * 59 + (int)($endEntry >>> 32 ^ $endEntry);
            return result;
        }

        @Generated
        public String toString() {
            return "PendingReadsManager.PendingReadKey(startEntry=" + this.getStartEntry() + ", endEntry=" + this.getEndEntry() + ")";
        }
    }

    private static final class FindPendingReadOutcome {
        final PendingRead pendingRead;
        final PendingReadKey missingOnLeft;
        final PendingReadKey missingOnRight;

        boolean needsAdditionalReads() {
            return this.missingOnLeft != null || this.missingOnRight != null;
        }

        @Generated
        public FindPendingReadOutcome(PendingRead pendingRead, PendingReadKey missingOnLeft, PendingReadKey missingOnRight) {
            this.pendingRead = pendingRead;
            this.missingOnLeft = missingOnLeft;
            this.missingOnRight = missingOnRight;
        }
    }

    private static final class ReadEntriesCallbackWithContext {
        final AsyncCallbacks.ReadEntriesCallback callback;
        final Object ctx;
        final long startEntry;
        final long endEntry;

        @Generated
        public ReadEntriesCallbackWithContext(AsyncCallbacks.ReadEntriesCallback callback, Object ctx, long startEntry, long endEntry) {
            this.callback = callback;
            this.ctx = ctx;
            this.startEntry = startEntry;
            this.endEntry = endEntry;
        }
    }
}

