package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.lock.LockManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.IMemoryManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.class */
public class Scheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
    private final Map<Integer, CachedMTreeStore> regionToStore;
    private final Set<Integer> flushingRegionSet;
    private final IReleaseFlushStrategy releaseFlushStrategy;
    private int BATCH_FLUSH_SUBTREE = 50;
    private int FLUSH_WORKER_NUM = 10;
    private final ExecutorService workerPool = IoTDBThreadPoolFactory.newFixedThreadPool(this.FLUSH_WORKER_NUM, ThreadName.PBTREE_WORKER_POOL.getName(), new ThreadPoolExecutor.DiscardPolicy());

    public Scheduler(Map<Integer, CachedMTreeStore> map, Set<Integer> set, IReleaseFlushStrategy iReleaseFlushStrategy) {
        this.regionToStore = map;
        this.flushingRegionSet = set;
        this.releaseFlushStrategy = iReleaseFlushStrategy;
    }

    private void executeFlush(CachedMTreeStore cachedMTreeStore, int i, AtomicInteger atomicInteger) {
        IMemoryManager memoryManager = cachedMTreeStore.getMemoryManager();
        ISchemaFile schemaFile = cachedMTreeStore.getSchemaFile();
        LockManager lockManager = cachedMTreeStore.getLockManager();
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        try {
            try {
                (atomicInteger == null ? new PBTreeFlushExecutor(memoryManager, schemaFile, lockManager) : new PBTreeFlushExecutor(atomicInteger, memoryManager, schemaFile, lockManager)).flushVolatileNodes(atomicLong, atomicLong2);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 > 10000) {
                    LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis2), Integer.valueOf(i));
                } else {
                    LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis2), Integer.valueOf(i));
                }
                cachedMTreeStore.recordFlushMetrics(currentTimeMillis2, atomicLong.get(), atomicLong2.get());
                this.flushingRegionSet.remove(Integer.valueOf(i));
            } catch (MetadataException e) {
                LOGGER.warn("Error occurred during MTree flush, current SchemaRegionId is {} because {}", new Object[]{Integer.valueOf(i), e.getMessage(), e});
                long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis3 > 10000) {
                    LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis3), Integer.valueOf(i));
                } else {
                    LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis3), Integer.valueOf(i));
                }
                cachedMTreeStore.recordFlushMetrics(currentTimeMillis3, atomicLong.get(), atomicLong2.get());
                this.flushingRegionSet.remove(Integer.valueOf(i));
            }
        } catch (Throwable th) {
            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis4 > 10000) {
                LOGGER.info("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis4), Integer.valueOf(i));
            } else {
                LOGGER.debug("It takes {}ms to flush MTree in SchemaRegion {}", Long.valueOf(currentTimeMillis4), Integer.valueOf(i));
            }
            cachedMTreeStore.recordFlushMetrics(currentTimeMillis4, atomicLong.get(), atomicLong2.get());
            this.flushingRegionSet.remove(Integer.valueOf(i));
            throw th;
        }
    }

    private void executeRelease(CachedMTreeStore cachedMTreeStore, boolean z) {
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        long currentTimeMillis = System.currentTimeMillis();
        do {
            if (!z && !this.releaseFlushStrategy.isExceedReleaseThreshold()) {
                break;
            }
        } while (!cachedMTreeStore.executeMemoryRelease(atomicLong, atomicLong2));
        cachedMTreeStore.recordReleaseMetrics(System.currentTimeMillis() - currentTimeMillis, atomicLong.get(), atomicLong2.get());
    }

    public synchronized CompletableFuture<Void> scheduleFlushAll() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, CachedMTreeStore> entry : this.regionToStore.entrySet()) {
            if (!this.flushingRegionSet.contains(entry.getKey())) {
                this.flushingRegionSet.add(entry.getKey());
                arrayList.add(entry);
            }
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map(entry2 -> {
            return CompletableFuture.runAsync(() -> {
                int intValue = ((Integer) entry2.getKey()).intValue();
                CachedMTreeStore cachedMTreeStore = (CachedMTreeStore) entry2.getValue();
                if (cachedMTreeStore == null) {
                    return;
                }
                LockManager lockManager = cachedMTreeStore.getLockManager();
                lockManager.globalReadLock();
                if (this.regionToStore.containsKey(Integer.valueOf(intValue))) {
                    try {
                        executeFlush(cachedMTreeStore, intValue, null);
                        executeRelease(cachedMTreeStore, false);
                        lockManager.globalReadUnlock();
                    } catch (Throwable th) {
                        lockManager.globalReadUnlock();
                        throw th;
                    }
                }
            }, this.workerPool);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    public synchronized void scheduleRelease(boolean z) {
        CompletableFuture.allOf((CompletableFuture[]) this.regionToStore.entrySet().stream().map(entry -> {
            return CompletableFuture.runAsync(() -> {
                int intValue = ((Integer) entry.getKey()).intValue();
                CachedMTreeStore cachedMTreeStore = (CachedMTreeStore) entry.getValue();
                if (cachedMTreeStore == null) {
                    return;
                }
                LockManager lockManager = cachedMTreeStore.getLockManager();
                lockManager.globalReadLock(true);
                if (this.regionToStore.containsKey(Integer.valueOf(intValue))) {
                    try {
                        executeRelease(cachedMTreeStore, z);
                        lockManager.globalReadUnlock();
                    } catch (Throwable th) {
                        lockManager.globalReadUnlock();
                        throw th;
                    }
                }
            }, this.workerPool);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    public synchronized void scheduleFlush(List<Integer> list) {
        AtomicInteger atomicInteger = new AtomicInteger(this.BATCH_FLUSH_SUBTREE);
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.flushingRegionSet.contains(Integer.valueOf(intValue))) {
                this.flushingRegionSet.add(Integer.valueOf(intValue));
                this.workerPool.submit(() -> {
                    CachedMTreeStore cachedMTreeStore = this.regionToStore.get(Integer.valueOf(intValue));
                    if (cachedMTreeStore == null) {
                        return;
                    }
                    LockManager lockManager = cachedMTreeStore.getLockManager();
                    lockManager.globalReadLock();
                    if (this.regionToStore.containsKey(Integer.valueOf(intValue))) {
                        try {
                            executeFlush(cachedMTreeStore, intValue, atomicInteger);
                            lockManager.globalReadUnlock();
                        } catch (Throwable th) {
                            lockManager.globalReadUnlock();
                            throw th;
                        }
                    }
                });
                if (atomicInteger.get() <= 0) {
                    return;
                }
            }
        }
    }

    public int getActiveWorkerNum() {
        return this.workerPool.getActiveCount();
    }

    public void clear() {
        this.workerPool.shutdown();
    }

    public boolean isTerminated() {
        return this.workerPool.isTerminated();
    }
}
