package org.apache.iotdb.db.engine;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.load.LoadTsFileManager;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.exception.WALException;
import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/StorageEngine.class */
public class StorageEngine implements IService {
    private static final long TTL_CHECK_INTERVAL = 60000;
    private final String systemDir;
    private final ConcurrentHashMap<DataRegionId, DataRegion> dataRegionMap;
    private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap;
    private final Map<String, Long> ttlMapForRecover;
    private AtomicInteger readyDataRegionNum;
    private AtomicBoolean isAllSgReady;
    private ScheduledExecutorService ttlCheckThread;
    private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
    private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
    private TsFileFlushPolicy fileFlushPolicy;
    private ExecutorService cachedThreadPool;
    private List<CloseFileListener> customCloseFileListeners;
    private List<FlushListener> customFlushListeners;
    private int recoverDataRegionNum;
    private LoadTsFileManager loadTsFileManager;
    private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static long timePartitionInterval = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/engine/StorageEngine$InstanceHolder.class */
    public static class InstanceHolder {
        private static final StorageEngine INSTANCE = new StorageEngine();

        private InstanceHolder() {
        }
    }

    private StorageEngine() {
        this.systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "databases";
        this.dataRegionMap = new ConcurrentHashMap<>();
        this.deletingDataRegionMap = new ConcurrentHashMap<>();
        this.ttlMapForRecover = new ConcurrentHashMap();
        this.isAllSgReady = new AtomicBoolean(false);
        this.fileFlushPolicy = new TsFileFlushPolicy.DirectFlushPolicy();
        this.customCloseFileListeners = new ArrayList();
        this.customFlushListeners = new ArrayList();
        this.recoverDataRegionNum = 0;
    }

    public static StorageEngine getInstance() {
        return InstanceHolder.INSTANCE;
    }

    private static void initTimePartition() {
        timePartitionInterval = IoTDBDescriptor.getInstance().getConfig().getTimePartitionInterval();
    }

    public static long getTimePartitionInterval() {
        if (timePartitionInterval == -1) {
            initTimePartition();
        }
        return timePartitionInterval;
    }

    public static long getTimePartition(long j) {
        if (timePartitionInterval == -1) {
            initTimePartition();
        }
        return j / timePartitionInterval;
    }

    public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor) throws WriteProcessRejectException {
        long currentTimeMillis = System.currentTimeMillis();
        while (SystemInfo.getInstance().isRejected()) {
            if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
                return;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (System.currentTimeMillis() - currentTimeMillis > config.getMaxWaitingTimeWhenInsertBlocked()) {
                throw new WriteProcessRejectException("System rejected over " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                break;
            }
        }
    }

    public void updateTTLInfo(byte[] bArr) {
        if (bArr == null) {
            return;
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        int readInt = ReadWriteIOUtils.readInt(wrap);
        for (int i = 0; i < readInt; i++) {
            this.ttlMapForRecover.put((String) Objects.requireNonNull(ReadWriteIOUtils.readString(wrap)), Long.valueOf(ReadWriteIOUtils.readLong(wrap)));
        }
    }

    public boolean isAllSgReady() {
        return this.isAllSgReady.get();
    }

    public void setAllSgReady(boolean z) {
        this.isAllSgReady.set(z);
    }

    public void recover() {
        setAllSgReady(false);
        this.cachedThreadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_SERVICE.getName());
        LinkedList linkedList = new LinkedList();
        asyncRecover(linkedList);
        if (!config.isClusterMode() || !config.getDataRegionConsensusProtocolClass().equals("org.apache.iotdb.consensus.ratis.RatisConsensus")) {
            try {
                WALRecoverManager.getInstance().recover();
            } catch (WALException e) {
                logger.error("Fail to recover wal.", e);
            }
        }
        new Thread(() -> {
            checkResults(linkedList, "StorageEngine failed to recover.");
            setAllSgReady(true);
            this.ttlMapForRecover.clear();
        }).start();
    }

    private void asyncRecover(List<Future<Void>> list) {
        Map<String, List<DataRegionId>> localDataRegionInfo = getLocalDataRegionInfo();
        localDataRegionInfo.values().forEach(list2 -> {
            this.recoverDataRegionNum += list2.size();
        });
        this.readyDataRegionNum = new AtomicInteger(0);
        WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new CountDownLatch(this.recoverDataRegionNum));
        for (Map.Entry<String, List<DataRegionId>> entry : localDataRegionInfo.entrySet()) {
            String key = entry.getKey();
            for (DataRegionId dataRegionId : entry.getValue()) {
                list.add(this.cachedThreadPool.submit(() -> {
                    DataRegion dataRegion = null;
                    try {
                        dataRegion = buildNewDataRegion(key, dataRegionId, this.ttlMapForRecover.getOrDefault(key, Long.valueOf(WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX)).longValue());
                    } catch (DataRegionException e) {
                        logger.error("Failed to recover data region {}[{}]", new Object[]{key, Integer.valueOf(dataRegionId.getId()), e});
                    }
                    this.dataRegionMap.put(dataRegionId, dataRegion);
                    logger.info("Data regions have been recovered {}/{}", Integer.valueOf(this.readyDataRegionNum.incrementAndGet()), Integer.valueOf(this.recoverDataRegionNum));
                    return null;
                }));
            }
        }
    }

    public Map<String, List<DataRegionId>> getLocalDataRegionInfo() {
        File[] listFiles = SystemFileFactory.INSTANCE.getFile(this.systemDir).listFiles();
        HashMap hashMap = new HashMap();
        if (listFiles == null) {
            return hashMap;
        }
        for (File file : listFiles) {
            if (file.isDirectory()) {
                String name = file.getName();
                ArrayList arrayList = new ArrayList();
                for (File file2 : file.listFiles()) {
                    if (file2.isDirectory()) {
                        arrayList.add(new DataRegionId(Integer.parseInt(file2.getName())));
                    }
                }
                hashMap.put(name, arrayList);
            }
        }
        return hashMap;
    }

    public void start() {
        initTimePartition();
        try {
            FileUtils.forceMkdir(SystemFileFactory.INSTANCE.getFile(this.systemDir));
            UpgradeUtils.recoverUpgrade();
            recover();
            this.ttlCheckThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TTL-Check");
            ScheduledExecutorUtil.safelyScheduleAtFixedRate(this.ttlCheckThread, this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
            logger.info("start ttl check thread successfully.");
            startTimedService();
        } catch (IOException e) {
            throw new StorageEngineFailureException(e);
        }
    }

    private void checkTTL() {
        try {
            for (DataRegion dataRegion : this.dataRegionMap.values()) {
                if (dataRegion != null) {
                    dataRegion.checkFilesTTL();
                }
            }
        } catch (ConcurrentModificationException e) {
        } catch (Exception e2) {
            logger.error("An error occurred when checking TTL", e2);
        }
    }

    private void startTimedService() {
        if (config.isEnableTimedFlushSeqMemtable()) {
            this.seqMemtableTimedFlushCheckThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.TIMED_FLUSH_SEQ_MEMTABLE.getName());
            ScheduledExecutorUtil.safelyScheduleAtFixedRate(this.seqMemtableTimedFlushCheckThread, this::timedFlushSeqMemTable, config.getSeqMemtableFlushCheckInterval(), config.getSeqMemtableFlushCheckInterval(), TimeUnit.MILLISECONDS);
            logger.info("start sequence memtable timed flush check thread successfully.");
        }
        if (config.isEnableTimedFlushUnseqMemtable()) {
            this.unseqMemtableTimedFlushCheckThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE.getName());
            ScheduledExecutorUtil.safelyScheduleAtFixedRate(this.unseqMemtableTimedFlushCheckThread, this::timedFlushUnseqMemTable, config.getUnseqMemtableFlushCheckInterval(), config.getUnseqMemtableFlushCheckInterval(), TimeUnit.MILLISECONDS);
            logger.info("start unsequence memtable timed flush check thread successfully.");
        }
    }

    private void timedFlushSeqMemTable() {
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion != null) {
                dataRegion.timedFlushSeqMemTable();
            }
        }
    }

    private void timedFlushUnseqMemTable() {
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion != null) {
                dataRegion.timedFlushUnseqMemTable();
            }
        }
    }

    public void stop() {
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion != null) {
                ThreadUtils.stopThreadPool(dataRegion.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
            }
        }
        syncCloseAllProcessor();
        ThreadUtils.stopThreadPool(this.ttlCheckThread, ThreadName.TTL_CHECK_SERVICE);
        ThreadUtils.stopThreadPool(this.seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_SEQ_MEMTABLE);
        ThreadUtils.stopThreadPool(this.unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE);
        if (this.cachedThreadPool != null) {
            this.cachedThreadPool.shutdownNow();
        }
        this.dataRegionMap.clear();
    }

    public void shutdown(long j) throws ShutdownException {
        try {
            Iterator<DataRegion> it = this.dataRegionMap.values().iterator();
            while (it.hasNext()) {
                ThreadUtils.stopThreadPool(it.next().getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
            }
            forceCloseAllProcessor();
            shutdownTimedService(this.ttlCheckThread, "TTlCheckThread");
            shutdownTimedService(this.seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
            shutdownTimedService(this.unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
            this.cachedThreadPool.shutdownNow();
            this.dataRegionMap.clear();
        } catch (TsFileProcessorException e) {
            throw new ShutdownException(e);
        }
    }

    private void shutdownTimedService(ScheduledExecutorService scheduledExecutorService, String str) {
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdownNow();
            try {
                scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("{} still doesn't exit after 30s", str);
                Thread.currentThread().interrupt();
            }
        }
    }

    public ServiceType getID() {
        return ServiceType.STORAGE_ENGINE_SERVICE;
    }

    public DataRegion buildNewDataRegion(String str, DataRegionId dataRegionId, long j) throws DataRegionException {
        logger.info("construct a data region instance, the database is {}, Thread is {}", str, Long.valueOf(Thread.currentThread().getId()));
        DataRegion dataRegion = new DataRegion(this.systemDir + File.separator + str, String.valueOf(dataRegionId.getId()), this.fileFlushPolicy, str);
        dataRegion.setDataTTLWithTimePrecisionCheck(j);
        dataRegion.setCustomFlushListeners(this.customFlushListeners);
        dataRegion.setCustomCloseFileListeners(this.customCloseFileListeners);
        return dataRegion;
    }

    public TSStatus write(DataRegionId dataRegionId, PlanNode planNode) {
        return (TSStatus) planNode.accept(new DataExecutionVisitor(), this.dataRegionMap.get(dataRegionId));
    }

    public synchronized void reset() {
        this.dataRegionMap.clear();
    }

    public void syncCloseAllProcessor() {
        logger.info("Start closing all database processor");
        ArrayList arrayList = new ArrayList();
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion != null) {
                arrayList.add(this.cachedThreadPool.submit(() -> {
                    dataRegion.syncCloseAllWorkingTsFileProcessors();
                    return null;
                }));
            }
        }
        checkResults(arrayList, "Failed to sync close processor.");
    }

    public void forceCloseAllProcessor() throws TsFileProcessorException {
        logger.info("Start force closing all database processor");
        ArrayList arrayList = new ArrayList();
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion != null) {
                arrayList.add(this.cachedThreadPool.submit(() -> {
                    dataRegion.forceCloseAllWorkingTsFileProcessors();
                    return null;
                }));
            }
        }
        checkResults(arrayList, "Failed to force close processor.");
    }

    public void closeStorageGroupProcessor(String str, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (DataRegion dataRegion : this.dataRegionMap.values()) {
            if (dataRegion.getDatabaseName().equals(str)) {
                if (z) {
                    for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
                        arrayList.add(this.cachedThreadPool.submit(() -> {
                            dataRegion.syncCloseOneTsFileProcessor(z, tsFileProcessor);
                            return null;
                        }));
                    }
                } else {
                    for (TsFileProcessor tsFileProcessor2 : dataRegion.getWorkUnsequenceTsFileProcessors()) {
                        arrayList.add(this.cachedThreadPool.submit(() -> {
                            dataRegion.syncCloseOneTsFileProcessor(z, tsFileProcessor2);
                            return null;
                        }));
                    }
                }
            }
        }
        checkResults(arrayList, "Failed to close database processor.");
    }

    private <V> void checkResults(List<Future<V>> list, String str) {
        Iterator<Future<V>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StorageEngineFailureException(str, e);
            } catch (ExecutionException e2) {
                throw new StorageEngineFailureException(str, e2);
            }
        }
    }

    public void mergeAll() throws StorageEngineException {
        if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
            throw new StorageEngineException("Current system mode is read only, does not support merge");
        }
        this.dataRegionMap.values().forEach((v0) -> {
            v0.compact();
        });
    }

    public void operateFlush(TFlushReq tFlushReq) {
        if (tFlushReq.storageGroups == null) {
            getInstance().syncCloseAllProcessor();
            WALManager.getInstance().deleteOutdatedWALFiles();
            return;
        }
        for (String str : tFlushReq.storageGroups) {
            if (tFlushReq.isSeq == null) {
                getInstance().closeStorageGroupProcessor(str, true);
                getInstance().closeStorageGroupProcessor(str, false);
            } else {
                getInstance().closeStorageGroupProcessor(str, Boolean.parseBoolean(tFlushReq.isSeq));
            }
        }
    }

    public void clearCache() {
        ChunkCache.getInstance().clear();
        TimeSeriesMetadataCache.getInstance().clear();
        BloomFilterCache.getInstance().clear();
    }

    public void setTTL(List<DataRegionId> list, long j) {
        Iterator<DataRegionId> it = list.iterator();
        while (it.hasNext()) {
            DataRegion dataRegion = this.dataRegionMap.get(it.next());
            if (dataRegion != null) {
                dataRegion.setDataTTLWithTimePrecisionCheck(j);
            }
        }
    }

    public void registerFlushListener(FlushListener flushListener) {
        this.customFlushListeners.add(flushListener);
    }

    public void registerCloseFileListener(CloseFileListener closeFileListener) {
        this.customCloseFileListeners.add(closeFileListener);
    }

    private void makeSureNoOldRegion(DataRegionId dataRegionId) {
        while (this.deletingDataRegionMap.containsKey(dataRegionId)) {
            DataRegion dataRegion = this.deletingDataRegionMap.get(dataRegionId);
            if (dataRegion != null) {
                dataRegion.waitForDeleted();
            }
        }
    }

    public DataRegion createDataRegion(DataRegionId dataRegionId, String str, long j) throws DataRegionException {
        makeSureNoOldRegion(dataRegionId);
        AtomicReference atomicReference = new AtomicReference(null);
        DataRegion computeIfAbsent = this.dataRegionMap.computeIfAbsent(dataRegionId, dataRegionId2 -> {
            try {
                return buildNewDataRegion(str, dataRegionId2, j);
            } catch (DataRegionException e) {
                atomicReference.set(e);
                return null;
            }
        });
        if (atomicReference.get() != null) {
            throw ((DataRegionException) atomicReference.get());
        }
        return computeIfAbsent;
    }

    public void deleteDataRegion(DataRegionId dataRegionId) {
        DataRegion computeIfAbsent;
        if (!this.dataRegionMap.containsKey(dataRegionId) || this.deletingDataRegionMap.containsKey(dataRegionId) || (computeIfAbsent = this.deletingDataRegionMap.computeIfAbsent(dataRegionId, dataRegionId2 -> {
            return this.dataRegionMap.remove(dataRegionId);
        })) == null) {
            return;
        }
        computeIfAbsent.markDeleted();
        try {
            try {
                computeIfAbsent.abortCompaction();
                computeIfAbsent.syncDeleteDataFiles();
                computeIfAbsent.deleteFolder(this.systemDir);
                if (config.isClusterMode() && config.getDataRegionConsensusProtocolClass().equals("org.apache.iotdb.consensus.iot.IoTConsensus")) {
                    WALManager.getInstance().deleteWALNode(computeIfAbsent.getDatabaseName() + "-" + computeIfAbsent.getDataRegionId());
                    for (String str : config.getDataDirs()) {
                        File file = new File(str + File.separator + "snapshot", computeIfAbsent.getDatabaseName() + "-" + dataRegionId.getId());
                        if (file.exists()) {
                            try {
                                FileUtils.deleteDirectory(file);
                            } catch (IOException e) {
                                logger.error("Failed to delete snapshot dir {}", file, e);
                            }
                        }
                    }
                }
                SyncService.getInstance().unregisterDataRegion(computeIfAbsent.getDataRegionId());
                this.deletingDataRegionMap.remove(dataRegionId);
            } catch (Exception e2) {
                logger.error("Error occurs when deleting data region {}-{}", new Object[]{computeIfAbsent.getDatabaseName(), computeIfAbsent.getDataRegionId(), e2});
                this.deletingDataRegionMap.remove(dataRegionId);
            }
        } catch (Throwable th) {
            this.deletingDataRegionMap.remove(dataRegionId);
            throw th;
        }
    }

    public DataRegion getDataRegion(DataRegionId dataRegionId) {
        return this.dataRegionMap.get(dataRegionId);
    }

    public List<DataRegion> getAllDataRegions() {
        return new ArrayList(this.dataRegionMap.values());
    }

    public List<DataRegionId> getAllDataRegionIds() {
        return new ArrayList(this.dataRegionMap.keySet());
    }

    public void setDataRegion(DataRegionId dataRegionId, DataRegion dataRegion) {
        if (this.dataRegionMap.containsKey(dataRegionId)) {
            DataRegion dataRegion2 = this.dataRegionMap.get(dataRegionId);
            dataRegion2.syncCloseAllWorkingTsFileProcessors();
            dataRegion2.abortCompaction();
        }
        this.dataRegionMap.put(dataRegionId, dataRegion);
    }

    public TSStatus setTTL(TSetTTLReq tSetTTLReq) {
        Map<String, List<DataRegionId>> localDataRegionInfo = getInstance().getLocalDataRegionInfo();
        ArrayList arrayList = new ArrayList();
        tSetTTLReq.storageGroupPathPattern.forEach(str -> {
            arrayList.addAll((Collection) localDataRegionInfo.get(str));
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            DataRegion dataRegion = this.dataRegionMap.get((DataRegionId) it.next());
            if (dataRegion != null) {
                dataRegion.setDataTTLWithTimePrecisionCheck(tSetTTLReq.TTL);
            }
        }
        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
    }

    public TsFileFlushPolicy getFileFlushPolicy() {
        return this.fileFlushPolicy;
    }

    public TSStatus writeLoadTsFileNode(DataRegionId dataRegionId, LoadTsFilePieceNode loadTsFilePieceNode, String str) {
        TSStatus tSStatus = new TSStatus();
        try {
            getLoadTsFileManager().writeToDataRegion(getDataRegion(dataRegionId), loadTsFilePieceNode, str);
            return RpcUtils.SUCCESS_STATUS;
        } catch (IOException e) {
            logger.error(String.format("IO error when writing piece node of TsFile %s to DataRegion %s.", loadTsFilePieceNode.getTsFile(), dataRegionId), e);
            tSStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
            return tSStatus;
        } catch (PageException e2) {
            logger.error(String.format("Parse Page error when writing piece node of TsFile %s to DataRegion %s.", loadTsFilePieceNode.getTsFile(), dataRegionId), e2);
            tSStatus.setCode(TSStatusCode.LOAD_PIECE_OF_TSFILE_ERROR.getStatusCode());
            tSStatus.setMessage(e2.getMessage());
            return tSStatus;
        }
    }

    public TSStatus executeLoadCommand(LoadTsFileScheduler.LoadCommand loadCommand, String str) {
        TSStatus tSStatus = new TSStatus();
        try {
            switch (loadCommand) {
                case EXECUTE:
                    if (!getLoadTsFileManager().loadAll(str)) {
                        tSStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
                        tSStatus.setMessage(String.format("No load TsFile uuid %s recorded for execute load command %s.", str, loadCommand));
                        break;
                    } else {
                        tSStatus = RpcUtils.SUCCESS_STATUS;
                        break;
                    }
                case ROLLBACK:
                    if (!getLoadTsFileManager().deleteAll(str)) {
                        tSStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
                        tSStatus.setMessage(String.format("No load TsFile uuid %s recorded for execute load command %s.", str, loadCommand));
                        break;
                    } else {
                        tSStatus = RpcUtils.SUCCESS_STATUS;
                        break;
                    }
                default:
                    tSStatus.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
                    tSStatus.setMessage(String.format("Wrong load command %s.", loadCommand));
                    break;
            }
        } catch (IOException | LoadFileException e) {
            logger.error(String.format("Execute load command %s error.", loadCommand), e);
            tSStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
        }
        return tSStatus;
    }

    public void rebootTimedService() throws ShutdownException {
        logger.info("Start rebooting all timed service.");
        stopTimedServiceAndThrow(this.seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
        stopTimedServiceAndThrow(this.unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
        logger.info("Stop all timed service successfully, and now restart them.");
        startTimedService();
        logger.info("Reboot all timed service successfully");
    }

    private void stopTimedServiceAndThrow(ScheduledExecutorService scheduledExecutorService, String str) throws ShutdownException {
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdownNow();
            try {
                scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("{} still doesn't exit after 30s", str);
                throw new ShutdownException(e);
            }
        }
    }

    private LoadTsFileManager getLoadTsFileManager() {
        if (this.loadTsFileManager == null) {
            synchronized (LoadTsFileManager.class) {
                if (this.loadTsFileManager == null) {
                    this.loadTsFileManager = new LoadTsFileManager();
                }
            }
        }
        return this.loadTsFileManager;
    }
}
