package org.apache.iotdb.db.engine.merge.manage;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/merge/manage/MergeManager.class */
public class MergeManager implements IService {
    private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
    private static final MergeManager INSTANCE = new MergeManager();
    private AtomicInteger threadCnt = new AtomicInteger();
    private ThreadPoolExecutor mergeTaskPool;
    private ThreadPoolExecutor mergeChunkSubTaskPool;
    private ScheduledExecutorService timedMergeThreadPool;

    private MergeManager() {
    }

    public static MergeManager getINSTANCE() {
        return INSTANCE;
    }

    public void submitMainTask(MergeTask mergeTask) {
        this.mergeTaskPool.submit(mergeTask);
    }

    public Future submitChunkSubTask(Callable callable) {
        return this.mergeChunkSubTaskPool.submit(callable);
    }

    @Override // org.apache.iotdb.db.service.IService
    public void start() {
        if (this.mergeTaskPool == null) {
            int mergeThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
            if (mergeThreadNum <= 0) {
                mergeThreadNum = 1;
            }
            int mergeChunkSubThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
            if (mergeChunkSubThreadNum <= 0) {
                mergeChunkSubThreadNum = 1;
            }
            this.mergeTaskPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(mergeThreadNum, runnable -> {
                return new Thread(runnable, "MergeThread-" + this.threadCnt.getAndIncrement());
            });
            this.mergeChunkSubTaskPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(mergeThreadNum * mergeChunkSubThreadNum, runnable2 -> {
                return new Thread(runnable2, "MergeChunkSubThread-" + this.threadCnt.getAndIncrement());
            });
            long mergeIntervalSec = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
            if (mergeIntervalSec > 0) {
                this.timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor(runnable3 -> {
                    return new Thread(runnable3, "TimedMergeThread");
                });
                this.timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeIntervalSec, mergeIntervalSec, TimeUnit.SECONDS);
            }
            logger.info("MergeManager started");
        }
    }

    @Override // org.apache.iotdb.db.service.IService
    public void stop() {
        if (this.mergeTaskPool == null) {
            return;
        }
        if (this.timedMergeThreadPool != null) {
            this.timedMergeThreadPool.shutdownNow();
            this.timedMergeThreadPool = null;
        }
        this.mergeTaskPool.shutdownNow();
        this.mergeChunkSubTaskPool.shutdownNow();
        logger.info("Waiting for task pool to shut down");
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (this.mergeTaskPool.isTerminated() && this.mergeChunkSubTaskPool.isTerminated()) {
                this.mergeTaskPool = null;
                logger.info("MergeManager stopped");
                return;
            } else {
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 % 60000 == 0) {
                    logger.warn("MergeManager has wait for {} seconds to stop", Long.valueOf(currentTimeMillis2 / 1000));
                }
            }
        }
    }

    @Override // org.apache.iotdb.db.service.IService
    public void waitAndStop(long j) {
        if (this.mergeTaskPool == null) {
            return;
        }
        if (this.timedMergeThreadPool != null) {
            awaitTermination(this.timedMergeThreadPool, j);
            this.timedMergeThreadPool = null;
        }
        awaitTermination(this.mergeTaskPool, j);
        awaitTermination(this.mergeChunkSubTaskPool, j);
        logger.info("Waiting for task pool to shut down");
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (this.mergeTaskPool.isTerminated() && this.mergeChunkSubTaskPool.isTerminated()) {
                this.mergeTaskPool = null;
                logger.info("MergeManager stopped");
                return;
            } else {
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 % 60000 == 0) {
                    logger.warn("MergeManager has wait for {} seconds to stop", Long.valueOf(currentTimeMillis2 / 1000));
                }
            }
        }
    }

    private void awaitTermination(ExecutorService executorService, long j) {
        try {
            executorService.shutdown();
            executorService.awaitTermination(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.warn("MergeThreadPool can not be closed in {} ms", Long.valueOf(j));
            Thread.currentThread().interrupt();
        }
        executorService.shutdownNow();
    }

    @Override // org.apache.iotdb.db.service.IService
    public ServiceType getID() {
        return ServiceType.MERGE_SERVICE;
    }

    private void mergeAll() {
        try {
            StorageEngine.getInstance().mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
        } catch (StorageEngineException e) {
            logger.error("Cannot perform a global merge because", e);
        }
    }
}
