/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.engine.merge.manage;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeManager
implements IService {
    private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
    private static final MergeManager INSTANCE = new MergeManager();
    private AtomicInteger threadCnt = new AtomicInteger();
    private ThreadPoolExecutor mergeTaskPool;
    private ThreadPoolExecutor mergeChunkSubTaskPool;
    private ScheduledExecutorService timedMergeThreadPool;

    private MergeManager() {
    }

    public static MergeManager getINSTANCE() {
        return INSTANCE;
    }

    public void submitMainTask(MergeTask mergeTask) {
        this.mergeTaskPool.submit(mergeTask);
    }

    public Future submitChunkSubTask(Callable callable) {
        return this.mergeChunkSubTaskPool.submit(callable);
    }

    @Override
    public void start() {
        if (this.mergeTaskPool == null) {
            int chunkSubThreadNum;
            int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
            if (threadNum <= 0) {
                threadNum = 1;
            }
            if ((chunkSubThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum()) <= 0) {
                chunkSubThreadNum = 1;
            }
            this.mergeTaskPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(threadNum, r -> new Thread(r, "MergeThread-" + this.threadCnt.getAndIncrement()));
            this.mergeChunkSubTaskPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(threadNum * chunkSubThreadNum, r -> new Thread(r, "MergeChunkSubThread-" + this.threadCnt.getAndIncrement()));
            long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
            if (mergeInterval > 0L) {
                this.timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "TimedMergeThread"));
                this.timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeInterval, mergeInterval, TimeUnit.SECONDS);
            }
            logger.info("MergeManager started");
        }
    }

    @Override
    public void stop() {
        if (this.mergeTaskPool != null) {
            if (this.timedMergeThreadPool != null) {
                this.timedMergeThreadPool.shutdownNow();
                this.timedMergeThreadPool = null;
            }
            this.mergeTaskPool.shutdownNow();
            this.mergeChunkSubTaskPool.shutdownNow();
            logger.info("Waiting for task pool to shut down");
            long startTime = System.currentTimeMillis();
            while (!this.mergeTaskPool.isTerminated() || !this.mergeChunkSubTaskPool.isTerminated()) {
                long time = System.currentTimeMillis() - startTime;
                if (time % 60000L != 0L) continue;
                logger.warn("MergeManager has wait for {} seconds to stop", (Object)(time / 1000L));
            }
            this.mergeTaskPool = null;
            logger.info("MergeManager stopped");
        }
    }

    @Override
    public void waitAndStop(long millseconds) {
        if (this.mergeTaskPool != null) {
            if (this.timedMergeThreadPool != null) {
                this.awaitTermination(this.timedMergeThreadPool, millseconds);
                this.timedMergeThreadPool = null;
            }
            this.awaitTermination(this.mergeTaskPool, millseconds);
            this.awaitTermination(this.mergeChunkSubTaskPool, millseconds);
            logger.info("Waiting for task pool to shut down");
            long startTime = System.currentTimeMillis();
            while (!this.mergeTaskPool.isTerminated() || !this.mergeChunkSubTaskPool.isTerminated()) {
                long time = System.currentTimeMillis() - startTime;
                if (time % 60000L != 0L) continue;
                logger.warn("MergeManager has wait for {} seconds to stop", (Object)(time / 1000L));
            }
            this.mergeTaskPool = null;
            logger.info("MergeManager stopped");
        }
    }

    private void awaitTermination(ExecutorService service, long millseconds) {
        try {
            service.shutdown();
            service.awaitTermination(millseconds, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            logger.warn("MergeThreadPool can not be closed in {} ms", (Object)millseconds);
            Thread.currentThread().interrupt();
        }
        service.shutdownNow();
    }

    @Override
    public ServiceType getID() {
        return ServiceType.MERGE_SERVICE;
    }

    private void mergeAll() {
        try {
            StorageEngine.getInstance().mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
        }
        catch (StorageEngineException e) {
            logger.error("Cannot perform a global merge because", (Throwable)e);
        }
    }
}

