/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.paging.cursor.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;

public class PageCursorProviderImpl
implements PageCursorProvider {
    private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
    private final PagingStore pagingStore;
    private final StorageManager storageManager;
    private final ExecutorFactory executorFactory;
    private final Executor executor;
    private final SoftValueHashMap<Long, PageCache> softCache;
    private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap();

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager, ExecutorFactory executorFactory, int maxCacheSize) {
        this.pagingStore = pagingStore;
        this.storageManager = storageManager;
        this.executorFactory = executorFactory;
        this.executor = executorFactory.getExecutor();
        this.softCache = new SoftValueHashMap(maxCacheSize);
    }

    @Override
    public PagingStore getAssociatedStore() {
        return this.pagingStore;
    }

    @Override
    public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) {
        PageSubscription activeCursor = (PageSubscription)this.activeCursors.get(cursorID);
        if (activeCursor != null) {
            throw new IllegalStateException("Cursor " + cursorID + " had already been created");
        }
        activeCursor = new PageSubscriptionImpl(this, this.pagingStore, this.storageManager, this.executorFactory.getExecutor(), filter, cursorID, persistent);
        this.activeCursors.put(cursorID, activeCursor);
        return activeCursor;
    }

    @Override
    public synchronized PageSubscription getSubscription(long cursorID) {
        return (PageSubscription)this.activeCursors.get(cursorID);
    }

    @Override
    public PagedMessage getMessage(PagePosition pos) throws Exception {
        PageCache cache = this.getPageCache(pos);
        if (pos.getMessageNr() >= cache.getNumberOfMessages()) {
            throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
        }
        return cache.getMessage(pos.getMessageNr());
    }

    @Override
    public PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription subscription) {
        return new PagedReferenceImpl(pos, msg, subscription);
    }

    @Override
    public PageCache getPageCache(PagePosition pos) {
        return this.getPageCache(pos.getPageNr());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addPageCache(PageCache cache) {
        SoftValueHashMap<Long, PageCache> softValueHashMap = this.softCache;
        synchronized (softValueHashMap) {
            this.softCache.put(cache.getPageId(), cache);
        }
    }

    @Override
    public int getCacheMaxSize() {
        return this.softCache.getMaxEelements();
    }

    @Override
    public void setCacheMaxSize(int size) {
        this.softCache.setMaxElements(size);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getCacheSize() {
        SoftValueHashMap<Long, PageCache> softValueHashMap = this.softCache;
        synchronized (softValueHashMap) {
            return this.softCache.size();
        }
    }

    @Override
    public void processReload() throws Exception {
        for (PageSubscription cursor : this.activeCursors.values()) {
            cursor.processReload();
        }
        this.cleanup();
    }

    @Override
    public void stop() {
        for (PageSubscription cursor : this.activeCursors.values()) {
            cursor.stop();
        }
        Future future = new Future();
        this.executor.execute(future);
        while (!future.await(10000L)) {
            log.warn("Waiting cursor provider " + this + " to finish executors");
        }
    }

    @Override
    public void flushExecutors() {
        for (PageSubscription cursor : this.activeCursors.values()) {
            cursor.flushExecutors();
        }
        Future future = new Future();
        this.executor.execute(future);
        while (!future.await(10000L)) {
            log.warn("Waiting cursor provider " + this + " to finish executors");
        }
    }

    @Override
    public void close(PageSubscription cursor) {
        this.activeCursors.remove(cursor.getId());
        this.scheduleCleanup();
    }

    @Override
    public void scheduleCleanup() {
        this.executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                PageCursorProviderImpl.this.storageManager.setContext(PageCursorProviderImpl.this.storageManager.newSingleThreadContext());
                try {
                    PageCursorProviderImpl.this.cleanup();
                }
                finally {
                    PageCursorProviderImpl.this.storageManager.clearContext();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void cleanup() {
        ArrayList<Page> depagedPages = new ArrayList<Page>();
        this.pagingStore.lock();
        PageCursorProviderImpl pageCursorProviderImpl = this;
        synchronized (pageCursorProviderImpl) {
            try {
                Page page;
                if (!this.pagingStore.isStarted()) {
                    return;
                }
                if (this.pagingStore.getNumberOfPages() == 0) {
                    return;
                }
                ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
                cursorList.addAll(this.activeCursors.values());
                long minPage = this.checkMinPage(cursorList);
                if (minPage == (long)this.pagingStore.getCurrentWritingPage() && this.pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
                    boolean complete = true;
                    for (PageSubscription cursor : cursorList) {
                        if (cursor.isComplete(minPage)) continue;
                        complete = false;
                        break;
                    }
                    if (complete) {
                        log.info("Address " + this.pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
                        this.pagingStore.forceAnotherPage();
                        Page currentPage = this.pagingStore.getCurrentPage();
                        this.storePositions(cursorList, currentPage);
                        this.pagingStore.stopPaging();
                        for (PageSubscription cursor : cursorList) {
                            cursor.scheduleCleanupCheck();
                        }
                    }
                }
                for (long i = this.pagingStore.getFirstPage(); i < minPage && (page = this.pagingStore.depage()) != null; ++i) {
                    depagedPages.add(page);
                }
                if (this.pagingStore.getNumberOfPages() == 0 || this.pagingStore.getNumberOfPages() == 1 && this.pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
                    this.pagingStore.stopPaging();
                }
            }
            catch (Exception ex) {
                log.warn("Couldn't complete cleanup on paging", ex);
                return;
            }
            finally {
                this.pagingStore.unlock();
            }
        }
        try {
            for (Page depagedPage : depagedPages) {
                depagedPage.delete();
                SoftValueHashMap<Long, PageCache> softValueHashMap = this.softCache;
                synchronized (softValueHashMap) {
                    this.softCache.remove(depagedPage.getPageId());
                }
            }
            return;
        }
        catch (Exception ex) {
            log.warn("Couldn't complete cleanup on paging", ex);
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception {
        try {
            for (PageSubscription cursor : cursorList) {
                cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
            }
            while (!this.storageManager.waitOnOperations(5000L)) {
                log.warn("Couldn't complete operations on IO context " + this.storageManager.getContext());
            }
        }
        finally {
            for (PageSubscription cursor : cursorList) {
                cursor.enableAutoCleanup();
            }
        }
    }

    @Override
    public void printDebug() {
        System.out.println("Debug information for PageCursorProviderImpl:");
        for (PageCache cache : this.softCache.values()) {
            System.out.println("Cache " + cache);
        }
    }

    protected PageCacheImpl createPageCache(long pageId) throws Exception {
        return new PageCacheImpl(this.pagingStore.createPage((int)pageId));
    }

    private long checkMinPage(List<PageSubscription> cursorList) {
        long minPage = Long.MAX_VALUE;
        for (PageSubscription cursor : cursorList) {
            long firstPage = cursor.getFirstPage();
            if (firstPage >= minPage) continue;
            minPage = firstPage;
        }
        return minPage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PageCache getPageCache(long pageId) {
        try {
            boolean needToRead = false;
            PageCache cache = null;
            SoftValueHashMap<Long, PageCache> softValueHashMap = this.softCache;
            synchronized (softValueHashMap) {
                if (pageId > (long)this.pagingStore.getCurrentWritingPage()) {
                    return null;
                }
                cache = (PageCache)this.softCache.get(pageId);
                if (cache == null) {
                    if (!this.pagingStore.checkPage((int)pageId)) {
                        return null;
                    }
                    cache = this.createPageCache(pageId);
                    needToRead = true;
                    cache.lock();
                    this.softCache.put(pageId, cache);
                }
            }
            if (needToRead) {
                Page page = null;
                try {
                    page = this.pagingStore.createPage((int)pageId);
                    page.open();
                    List<PagedMessage> pgdMessages = page.read();
                    for (PagedMessage pdgMessage : pgdMessages) {
                        pdgMessage.initMessage(this.storageManager);
                    }
                    cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
                }
                finally {
                    try {
                        if (page != null) {
                            page.close();
                        }
                    }
                    catch (Throwable ignored) {}
                    cache.unlock();
                }
            }
            return cache;
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
        }
    }
}

