package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.MemManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager.class */
public class CacheMemoryManager {
    private static final Logger logger = LoggerFactory.getLogger(CacheMemoryManager.class);
    private final List<CachedMTreeStore> storeList;
    private CachedSchemaEngineStatistics engineStatistics;
    private SchemaEngineCachedMetric engineMetric;
    private static final int CONCURRENT_NUM = 10;
    private ExecutorService flushTaskProcessor;
    private ExecutorService flushTaskMonitor;
    private ExecutorService releaseTaskProcessor;
    private ExecutorService releaseTaskMonitor;
    private FiniteSemaphore flushSemaphore;
    private FiniteSemaphore releaseSemaphore;
    private volatile boolean hasFlushTask;
    private volatile boolean hasReleaseTask;
    private IReleaseFlushStrategy releaseFlushStrategy;
    private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3000;
    private final Object blockObject;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/cache/CacheMemoryManager$GlobalCacheManagerHolder.class */
    public static class GlobalCacheManagerHolder {
        private static final CacheMemoryManager INSTANCE = new CacheMemoryManager();

        private GlobalCacheManagerHolder() {
        }
    }

    public ICacheManager createLRUCacheManager(CachedMTreeStore cachedMTreeStore, MemManager memManager) {
        LRUCacheManager lRUCacheManager = new LRUCacheManager(memManager);
        this.storeList.add(cachedMTreeStore);
        return lRUCacheManager;
    }

    public void clearCachedMTreeStore(CachedMTreeStore cachedMTreeStore) {
        this.storeList.remove(cachedMTreeStore);
    }

    public void init(ISchemaEngineStatistics iSchemaEngineStatistics) {
        this.flushSemaphore = new FiniteSemaphore(2, 0);
        this.releaseSemaphore = new FiniteSemaphore(2, 0);
        this.engineStatistics = iSchemaEngineStatistics.getAsCachedSchemaEngineStatistics();
        if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >= 0) {
            this.releaseFlushStrategy = new ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
        } else {
            this.releaseFlushStrategy = new ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
        }
        this.flushTaskMonitor = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_FLUSH_MONITOR.getName());
        this.flushTaskProcessor = IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_PROCESSOR.getName());
        this.releaseTaskMonitor = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_RELEASE_MONITOR.getName());
        this.releaseTaskProcessor = IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_PROCESSOR.getName());
        this.releaseTaskMonitor.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    this.releaseSemaphore.acquire();
                    try {
                        if (isExceedReleaseThreshold()) {
                            this.hasReleaseTask = true;
                            tryExecuteMemoryRelease();
                        }
                    } catch (Throwable th) {
                        this.hasReleaseTask = false;
                        logger.error("Something wrong happened during MTree release.", th);
                    }
                } catch (InterruptedException e) {
                    logger.info("ReleaseTaskMonitor thread is interrupted.");
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
        this.flushTaskMonitor.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    this.flushSemaphore.acquire();
                    try {
                        if (isExceedFlushThreshold()) {
                            this.hasFlushTask = true;
                            tryFlushVolatileNodes();
                        }
                    } catch (Throwable th) {
                        this.hasFlushTask = false;
                        logger.error("Something wrong happened during MTree flush.", th);
                    }
                } catch (InterruptedException e) {
                    logger.info("FlushTaskMonitor thread is interrupted.");
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
    }

    public void setEngineMetric(SchemaEngineCachedMetric schemaEngineCachedMetric) {
        this.engineMetric = schemaEngineCachedMetric;
    }

    public boolean isExceedReleaseThreshold() {
        return this.releaseFlushStrategy.isExceedReleaseThreshold();
    }

    public boolean isExceedFlushThreshold() {
        return this.releaseFlushStrategy.isExceedFlushThreshold();
    }

    public void ensureMemoryStatus() {
        if (isExceedReleaseThreshold()) {
            registerReleaseTask();
        }
    }

    public void waitIfReleasing() {
        synchronized (this.blockObject) {
            if (this.hasReleaseTask || this.hasFlushTask) {
                try {
                    this.blockObject.wait(3000L);
                } catch (InterruptedException e) {
                    logger.warn("Interrupt because the release task and flush task did not finish within {} milliseconds.", Integer.valueOf(MAX_WAITING_TIME_WHEN_RELEASING));
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void registerReleaseTask() {
        this.releaseSemaphore.release();
    }

    private void tryExecuteMemoryRelease() {
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture.allOf((CompletableFuture[]) this.storeList.stream().map(cachedMTreeStore -> {
            return CompletableFuture.runAsync(() -> {
                cachedMTreeStore.getLock().threadReadLock(true);
                try {
                    executeMemoryRelease(cachedMTreeStore);
                } finally {
                    cachedMTreeStore.getLock().threadReadUnlock();
                }
            }, this.releaseTaskProcessor);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
        if (this.engineMetric != null) {
            this.engineMetric.recordRelease(System.currentTimeMillis() - currentTimeMillis);
        }
        synchronized (this.blockObject) {
            this.hasReleaseTask = false;
            if (isExceedFlushThreshold()) {
                registerFlushTask();
            } else {
                this.blockObject.notifyAll();
            }
        }
    }

    private void executeMemoryRelease(CachedMTreeStore cachedMTreeStore) {
        while (isExceedReleaseThreshold() && !cachedMTreeStore.executeMemoryRelease()) {
        }
    }

    private void registerFlushTask() {
        this.flushSemaphore.release();
    }

    private void tryFlushVolatileNodes() {
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture.allOf((CompletableFuture[]) this.storeList.stream().map(cachedMTreeStore -> {
            return CompletableFuture.runAsync(() -> {
                cachedMTreeStore.getLock().writeLock();
                try {
                    cachedMTreeStore.flushVolatileNodes();
                    executeMemoryRelease(cachedMTreeStore);
                } finally {
                    cachedMTreeStore.getLock().unlockWrite();
                }
            }, this.flushTaskProcessor);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
        if (this.engineMetric != null) {
            this.engineMetric.recordFlush(System.currentTimeMillis() - currentTimeMillis);
        }
        synchronized (this.blockObject) {
            this.hasFlushTask = false;
            this.blockObject.notifyAll();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x004e, code lost:
    
        if (r3.releaseTaskProcessor != null) goto L18;
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x0055, code lost:
    
        if (r3.hasReleaseTask != false) goto L43;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x005b, code lost:
    
        r3.releaseTaskProcessor.shutdown();
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x006d, code lost:
    
        if (r3.releaseTaskProcessor.isTerminated() == false) goto L45;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0073, code lost:
    
        r3.releaseTaskProcessor = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x007c, code lost:
    
        if (r3.flushTaskProcessor == null) goto L36;
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0083, code lost:
    
        if (r3.hasFlushTask != false) goto L47;
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0089, code lost:
    
        r3.flushTaskProcessor.shutdown();
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x009b, code lost:
    
        if (r3.flushTaskProcessor.isTerminated() == false) goto L49;
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x00a1, code lost:
    
        r3.flushTaskProcessor = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00a6, code lost:
    
        r3.storeList.clear();
        r3.releaseFlushStrategy = null;
        r3.engineStatistics = null;
        r3.releaseSemaphore = null;
        r3.flushSemaphore = null;
        r3.engineMetric = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x00c8, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void clear() {
        /*
            r3 = this;
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskMonitor
            if (r0 == 0) goto L25
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskMonitor
            java.util.List r0 = r0.shutdownNow()
        L11:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskMonitor
            boolean r0 = r0.isTerminated()
            if (r0 == 0) goto L11
            goto L20
        L20:
            r0 = r3
            r1 = 0
            r0.releaseTaskMonitor = r1
        L25:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskMonitor
            if (r0 == 0) goto L4a
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskMonitor
            java.util.List r0 = r0.shutdownNow()
        L36:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskMonitor
            boolean r0 = r0.isTerminated()
            if (r0 == 0) goto L36
            goto L45
        L45:
            r0 = r3
            r1 = 0
            r0.releaseTaskMonitor = r1
        L4a:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskProcessor
            if (r0 == 0) goto L78
        L51:
            r0 = r3
            boolean r0 = r0.hasReleaseTask
            if (r0 != 0) goto L51
            goto L5b
        L5b:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskProcessor
            r0.shutdown()
        L64:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.releaseTaskProcessor
            boolean r0 = r0.isTerminated()
            if (r0 == 0) goto L64
            goto L73
        L73:
            r0 = r3
            r1 = 0
            r0.releaseTaskProcessor = r1
        L78:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskProcessor
            if (r0 == 0) goto La6
        L7f:
            r0 = r3
            boolean r0 = r0.hasFlushTask
            if (r0 != 0) goto L7f
            goto L89
        L89:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskProcessor
            r0.shutdown()
        L92:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.flushTaskProcessor
            boolean r0 = r0.isTerminated()
            if (r0 == 0) goto L92
            goto La1
        La1:
            r0 = r3
            r1 = 0
            r0.flushTaskProcessor = r1
        La6:
            r0 = r3
            java.util.List<org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore> r0 = r0.storeList
            r0.clear()
            r0 = r3
            r1 = 0
            r0.releaseFlushStrategy = r1
            r0 = r3
            r1 = 0
            r0.engineStatistics = r1
            r0 = r3
            r1 = 0
            r0.releaseSemaphore = r1
            r0 = r3
            r1 = 0
            r0.flushSemaphore = r1
            r0 = r3
            r1 = 0
            r0.engineMetric = r1
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager.clear():void");
    }

    public int getReleaseThreadNum() {
        return this.releaseTaskProcessor.getActiveCount();
    }

    public int getFlushThreadNum() {
        return this.flushTaskProcessor.getActiveCount();
    }

    private CacheMemoryManager() {
        this.storeList = new CopyOnWriteArrayList();
        this.blockObject = new Object();
    }

    public static CacheMemoryManager getInstance() {
        return GlobalCacheManagerHolder.INSTANCE;
    }

    public void forceFlushAndRelease() {
        this.releaseFlushStrategy = new IReleaseFlushStrategy() { // from class: org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.cache.CacheMemoryManager.1
            @Override // org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy
            public boolean isExceedReleaseThreshold() {
                return true;
            }

            @Override // org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy
            public boolean isExceedFlushThreshold() {
                return true;
            }
        };
        registerFlushTask();
    }
}
