package org.apache.iotdb.db.engine.merge.manage;

import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/merge/manage/MergeManager.class */
public class MergeManager implements IService, MergeManagerMBean {
    private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
    private static final MergeManager INSTANCE = new MergeManager();
    private ThreadPoolExecutor mergeTaskPool;
    private ThreadPoolExecutor mergeChunkSubTaskPool;
    private ScheduledExecutorService taskCleanerThreadPool;
    private final String mbeanName = String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, getID().getJmxName());
    private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
    private AtomicInteger threadCnt = new AtomicInteger();
    private Map<String, Set<MergeFuture>> storageGroupMainTasks = new ConcurrentHashMap();
    private Map<String, Set<MergeFuture>> storageGroupSubTasks = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/iotdb/db/engine/merge/manage/MergeManager$TaskStatus.class */
    public static class TaskStatus {
        private String taskName;
        private String createdTime;
        private String progress;
        private boolean isDone;
        private boolean isCancelled;

        public TaskStatus(MergeFuture mergeFuture) {
            this.taskName = mergeFuture.getTaskName();
            this.createdTime = mergeFuture.getCreatedTime();
            this.progress = mergeFuture.getProgress();
            this.isCancelled = mergeFuture.isCancelled();
            this.isDone = mergeFuture.isDone();
        }

        public String toString() {
            return String.format("%s, %s, %s, done:%s, cancelled:%s", this.taskName, this.createdTime, this.progress, Boolean.valueOf(this.isDone), Boolean.valueOf(this.isCancelled));
        }

        public String getTaskName() {
            return this.taskName;
        }

        public String getCreatedTime() {
            return this.createdTime;
        }

        public String getProgress() {
            return this.progress;
        }

        public boolean isDone() {
            return this.isDone;
        }

        public boolean isCancelled() {
            return this.isCancelled;
        }
    }

    private MergeManager() {
    }

    public RateLimiter getMergeWriteRateLimiter() {
        setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
        return this.mergeWriteRateLimiter;
    }

    public static void mergeRateLimiterAcquire(RateLimiter rateLimiter, long j) {
        while (j >= 2147483647L) {
            rateLimiter.acquire(Integer.MAX_VALUE);
            j -= 2147483647L;
        }
        if (j > 0) {
            rateLimiter.acquire((int) j);
        }
    }

    private void setWriteMergeRate(double d) {
        double d2 = d * 1024.0d * 1024.0d;
        if (d2 == 0.0d) {
            d2 = Double.MAX_VALUE;
        }
        if (this.mergeWriteRateLimiter.getRate() != d2) {
            this.mergeWriteRateLimiter.setRate(d2);
        }
    }

    public static MergeManager getINSTANCE() {
        return INSTANCE;
    }

    public void submitMainTask(MergeTask mergeTask) {
        this.storageGroupMainTasks.computeIfAbsent(mergeTask.getStorageGroupName(), str -> {
            return new ConcurrentSkipListSet();
        }).add((MergeFuture) this.mergeTaskPool.submit(mergeTask));
    }

    public Future<Void> submitChunkSubTask(MergeMultiChunkTask.MergeChunkHeapTask mergeChunkHeapTask) {
        MergeFuture mergeFuture = (MergeFuture) this.mergeChunkSubTaskPool.submit(mergeChunkHeapTask);
        this.storageGroupSubTasks.computeIfAbsent(mergeChunkHeapTask.getStorageGroupName(), str -> {
            return new ConcurrentSkipListSet();
        }).add(mergeFuture);
        return mergeFuture;
    }

    @Override // org.apache.iotdb.db.service.IService
    public void start() {
        JMXService.registerMBean(this, this.mbeanName);
        if (this.mergeTaskPool == null) {
            int mergeThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
            if (mergeThreadNum <= 0) {
                mergeThreadNum = 1;
            }
            int mergeChunkSubThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
            if (mergeChunkSubThreadNum <= 0) {
                mergeChunkSubThreadNum = 1;
            }
            this.mergeTaskPool = new MergeThreadPool(mergeThreadNum, runnable -> {
                return new Thread(runnable, "MergeThread-" + this.threadCnt.getAndIncrement());
            });
            this.mergeChunkSubTaskPool = new MergeThreadPool(mergeThreadNum * mergeChunkSubThreadNum, runnable2 -> {
                return new Thread(runnable2, "MergeChunkSubThread-" + this.threadCnt.getAndIncrement());
            });
            this.taskCleanerThreadPool = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("MergeTaskCleaner");
            this.taskCleanerThreadPool.scheduleAtFixedRate(this::cleanFinishedTask, 30L, 30L, TimeUnit.MINUTES);
            logger.info("MergeManager started");
        }
    }

    @Override // org.apache.iotdb.db.service.IService
    public void stop() {
        if (this.mergeTaskPool != null) {
            this.taskCleanerThreadPool.shutdownNow();
            this.taskCleanerThreadPool = null;
            this.mergeTaskPool.shutdownNow();
            this.mergeChunkSubTaskPool.shutdownNow();
            logger.info("Waiting for task pool to shut down");
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                if (this.mergeTaskPool.isTerminated() && this.mergeChunkSubTaskPool.isTerminated()) {
                    break;
                }
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    logger.error("CompactionMergeTaskPoolManager {} shutdown", ThreadName.COMPACTION_SERVICE.getName(), e);
                    Thread.currentThread().interrupt();
                }
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if ((0 + 200) % 60000 == 0) {
                    logger.warn("MergeManager has wait for {} seconds to stop", Long.valueOf(currentTimeMillis2 / 1000));
                }
            }
            this.mergeTaskPool = null;
            this.storageGroupMainTasks.clear();
            this.storageGroupSubTasks.clear();
            logger.info("MergeManager stopped");
        }
        JMXService.deregisterMBean(this.mbeanName);
    }

    @Override // org.apache.iotdb.db.service.IService
    public void waitAndStop(long j) {
        if (this.mergeTaskPool == null) {
            return;
        }
        awaitTermination(this.taskCleanerThreadPool, j);
        this.taskCleanerThreadPool = null;
        awaitTermination(this.mergeTaskPool, j);
        awaitTermination(this.mergeChunkSubTaskPool, j);
        logger.info("Waiting for task pool to shut down");
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (this.mergeTaskPool.isTerminated() && this.mergeChunkSubTaskPool.isTerminated()) {
                this.mergeTaskPool = null;
                this.storageGroupMainTasks.clear();
                this.storageGroupSubTasks.clear();
                logger.info("MergeManager stopped");
                return;
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                logger.error("CompactionMergeTaskPoolManager {} shutdown", ThreadName.COMPACTION_SERVICE.getName(), e);
                Thread.currentThread().interrupt();
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if ((0 + 200) % 60000 == 0) {
                logger.warn("MergeManager has wait for {} seconds to stop", Long.valueOf(currentTimeMillis2 / 1000));
            }
        }
    }

    private void awaitTermination(ExecutorService executorService, long j) {
        try {
            executorService.shutdown();
            executorService.awaitTermination(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.warn("MergeThreadPool can not be closed in {} ms", Long.valueOf(j));
            Thread.currentThread().interrupt();
        }
        executorService.shutdownNow();
    }

    @Override // org.apache.iotdb.db.service.IService
    public ServiceType getID() {
        return ServiceType.MERGE_SERVICE;
    }

    private void mergeAll() {
        try {
            StorageEngine.getInstance().mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
        } catch (StorageEngineException e) {
            logger.error("Cannot perform a global merge because", e);
        }
    }

    @Override // org.apache.iotdb.db.engine.merge.manage.MergeManagerMBean
    public void abortMerge(String str) {
        Iterator<MergeFuture> it = this.storageGroupSubTasks.getOrDefault(str, Collections.emptySet()).iterator();
        while (it.hasNext()) {
            MergeFuture next = it.next();
            if (!next.isDone() && !next.isCancelled()) {
                next.cancel(true);
            }
            it.remove();
        }
        Iterator<MergeFuture> it2 = this.storageGroupMainTasks.getOrDefault(str, Collections.emptySet()).iterator();
        while (it2.hasNext()) {
            MergeFuture next2 = it2.next();
            if (!next2.isDone() && !next2.isCancelled()) {
                next2.cancel(true);
            }
            it2.remove();
        }
    }

    private void cleanFinishedTask() {
        Iterator<Set<MergeFuture>> it = this.storageGroupSubTasks.values().iterator();
        while (it.hasNext()) {
            it.next().removeIf(mergeFuture -> {
                return mergeFuture.isDone() || mergeFuture.isCancelled();
            });
        }
        Iterator<Set<MergeFuture>> it2 = this.storageGroupMainTasks.values().iterator();
        while (it2.hasNext()) {
            it2.next().removeIf(mergeFuture2 -> {
                return mergeFuture2.isDone() || mergeFuture2.isCancelled();
            });
        }
    }

    public Map<String, List<TaskStatus>>[] collectTaskStatus() {
        Map<String, List<TaskStatus>>[] mapArr = {new HashMap(), new HashMap()};
        for (Map.Entry<String, Set<MergeFuture>> entry : this.storageGroupMainTasks.entrySet()) {
            String key = entry.getKey();
            Iterator<MergeFuture> it = entry.getValue().iterator();
            while (it.hasNext()) {
                mapArr[0].computeIfAbsent(key, str -> {
                    return new ArrayList();
                }).add(new TaskStatus(it.next()));
            }
        }
        for (Map.Entry<String, Set<MergeFuture>> entry2 : this.storageGroupSubTasks.entrySet()) {
            String key2 = entry2.getKey();
            Iterator<MergeFuture> it2 = entry2.getValue().iterator();
            while (it2.hasNext()) {
                mapArr[1].computeIfAbsent(key2, str2 -> {
                    return new ArrayList();
                }).add(new TaskStatus(it2.next()));
            }
        }
        return mapArr;
    }

    public String genMergeTaskReport() {
        Map<String, List<TaskStatus>>[] collectTaskStatus = collectTaskStatus();
        StringBuilder append = new StringBuilder("Main tasks:").append(System.lineSeparator());
        for (Map.Entry<String, List<TaskStatus>> entry : collectTaskStatus[0].entrySet()) {
            String key = entry.getKey();
            List<TaskStatus> value = entry.getValue();
            append.append("\t").append("Storage group: ").append(key).append(System.lineSeparator());
            Iterator<TaskStatus> it = value.iterator();
            while (it.hasNext()) {
                append.append("\t\t").append(it.next()).append(System.lineSeparator());
            }
        }
        append.append("Sub tasks:").append(System.lineSeparator());
        for (Map.Entry<String, List<TaskStatus>> entry2 : collectTaskStatus[1].entrySet()) {
            String key2 = entry2.getKey();
            List<TaskStatus> value2 = entry2.getValue();
            append.append("\t").append("Storage group: ").append(key2).append(System.lineSeparator());
            Iterator<TaskStatus> it2 = value2.iterator();
            while (it2.hasNext()) {
                append.append("\t\t").append(it2.next()).append(System.lineSeparator());
            }
        }
        return append.toString();
    }

    @Override // org.apache.iotdb.db.engine.merge.manage.MergeManagerMBean
    public void printMergeStatus() {
        if (logger.isInfoEnabled()) {
            logger.info("Running tasks:\n {}", genMergeTaskReport());
        }
    }
}
