/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.service.thrift.impl;

import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataNodeInternalRPCServiceImpl
implements IDataNodeRPCService.Iface {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
    private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
    private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
    private final DataNodeRegionManager regionManager = DataNodeRegionManager.getInstance();

    public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
        FragmentInstance fragmentInstance;
        ConsensusGroupId groupId;
        LOGGER.info("receive FragmentInstance to group[{}]", (Object)req.getConsensusGroupId());
        try {
            groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        }
        catch (Throwable t) {
            LOGGER.error("Deserialize ConsensusGroupId failed. ", t);
            TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
            resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
            return resp;
        }
        try {
            fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
        }
        catch (Throwable t) {
            LOGGER.error("Deserialize FragmentInstance failed.", t);
            TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
            resp.setMessage("Deserialize FragmentInstance failed: " + t.getMessage());
            return resp;
        }
        RegionReadExecutor executor = new RegionReadExecutor();
        RegionExecutionResult executionResult = executor.execute(groupId, fragmentInstance);
        TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
        resp.setAccepted(executionResult.isAccepted());
        resp.setMessage(executionResult.getMessage());
        return resp;
    }

    public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
        LOGGER.info("receive PlanNode to group[{}]", (Object)req.getConsensusGroupId());
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
        RegionWriteExecutor executor = new RegionWriteExecutor();
        TSendPlanNodeResp resp = new TSendPlanNodeResp();
        RegionExecutionResult executionResult = executor.execute(groupId, planNode);
        resp.setAccepted(executionResult.isAccepted());
        resp.setMessage(executionResult.getMessage());
        resp.setStatus(executionResult.getStatus());
        return resp;
    }

    public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
        FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
        FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
        if (info != null) {
            TFragmentInstanceStateResp resp = new TFragmentInstanceStateResp(info.getState().toString());
            resp.setFailedMessages((List)ImmutableList.of((Object)info.getMessage()));
            return resp;
        }
        return new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
    }

    public TCancelResp cancelQuery(TCancelQueryReq req) {
        try (SetThreadName threadName = new SetThreadName(req.getQueryId());){
            List taskIds = req.getFragmentInstanceIds().stream().map(FragmentInstanceId::fromThrift).collect(Collectors.toList());
            for (FragmentInstanceId taskId : taskIds) {
                FragmentInstanceManager.getInstance().cancelTask(taskId);
            }
            TCancelResp tCancelResp = new TCancelResp(true);
            return tCancelResp;
        }
    }

    public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
        throw new NotImplementedException();
    }

    public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
        throw new NotImplementedException();
    }

    public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
        throw new UnsupportedOperationException();
    }

    public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
        LOGGER.info(String.format("Receive load node from uuid %s.", req.uuid));
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.consensusGroupId);
        LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)PlanNodeType.deserialize(req.body);
        if (pieceNode == null) {
            return this.createTLoadResp(new TSStatus(TSStatusCode.NODE_DESERIALIZE_ERROR.getStatusCode()));
        }
        TSStatus resultStatus = StorageEngineV2.getInstance().writeLoadTsFileNode((DataRegionId)groupId, pieceNode, req.uuid);
        return this.createTLoadResp(resultStatus);
    }

    public TLoadResp sendLoadCommand(TLoadCommandReq req) throws TException {
        TSStatus resultStatus = StorageEngineV2.getInstance().executeLoadCommand(LoadTsFileScheduler.LoadCommand.values()[req.commandType], req.uuid);
        return this.createTLoadResp(resultStatus);
    }

    private TLoadResp createTLoadResp(TSStatus resultStatus) {
        boolean isAccepted = RpcUtils.SUCCESS_STATUS.equals(resultStatus);
        TLoadResp loadResp = new TLoadResp(isAccepted);
        if (!isAccepted) {
            loadResp.setMessage(resultStatus.getMessage());
            loadResp.setStatus(resultStatus);
        }
        return loadResp;
    }

    public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
        return this.regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
    }

    public TSStatus createDataRegion(TCreateDataRegionReq req) {
        return this.regionManager.createDataRegion(req.getRegionReplicaSet(), req.getStorageGroup(), req.getTtl());
    }

    public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
        ClusterPartitionFetcher.getInstance().invalidAllCache();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
        DataNodeSchemaCache.getInstance().cleanUp();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus constructSchemaBlackList(TConstructSchemaBlackListReq req) throws TException {
        PathPatternTree patternTree = PathPatternTree.deserialize((ByteBuffer)ByteBuffer.wrap(req.getPathPatternTree()));
        ArrayList<TSStatus> failureList = new ArrayList<TSStatus>();
        int preDeletedNum = 0;
        for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
            String storageGroup = this.schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId())).getStorageGroupFullPath();
            PathPatternTree filteredPatternTree = this.filterPathPatternTree(patternTree, storageGroup);
            if (filteredPatternTree.isEmpty()) continue;
            RegionWriteExecutor executor = new RegionWriteExecutor();
            TSStatus status = executor.execute((ConsensusGroupId)new SchemaRegionId(consensusGroupId.getId()), new ConstructSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree)).getStatus();
            if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                preDeletedNum += Integer.parseInt(status.getMessage());
                continue;
            }
            failureList.add(status);
        }
        if (!failureList.isEmpty()) {
            return RpcUtils.getStatus(failureList);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS, (String)String.valueOf(preDeletedNum));
    }

    public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) throws TException {
        PathPatternTree patternTree = PathPatternTree.deserialize((ByteBuffer)ByteBuffer.wrap(req.getPathPatternTree()));
        ArrayList<TSStatus> failureList = new ArrayList<TSStatus>();
        for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
            String storageGroup = this.schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId())).getStorageGroupFullPath();
            PathPatternTree filteredPatternTree = this.filterPathPatternTree(patternTree, storageGroup);
            if (filteredPatternTree.isEmpty()) continue;
            RegionWriteExecutor executor = new RegionWriteExecutor();
            TSStatus status = executor.execute((ConsensusGroupId)new SchemaRegionId(consensusGroupId.getId()), new RollbackSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree)).getStatus();
            if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) continue;
            failureList.add(status);
        }
        if (!failureList.isEmpty()) {
            return RpcUtils.getStatus(failureList);
        }
        return RpcUtils.SUCCESS_STATUS;
    }

    public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) throws TException {
        DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
        cache.takeWriteLock();
        try {
            cache.cleanUp();
        }
        finally {
            cache.releaseWriteLock();
        }
        return RpcUtils.SUCCESS_STATUS;
    }

    public TFetchSchemaBlackListResp fetchSchemaBlackList(TFetchSchemaBlackListReq req) throws TException {
        PathPatternTree patternTree = PathPatternTree.deserialize((ByteBuffer)req.pathPatternTree);
        TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
        PathPatternTree result = new PathPatternTree();
        for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
            try {
                ISchemaRegion schemaRegion = this.schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
                PathPatternTree filteredPatternTree = this.filterPathPatternTree(patternTree, schemaRegion.getStorageGroupFullPath());
                if (filteredPatternTree.isEmpty()) continue;
                for (PartialPath path : schemaRegion.fetchSchemaBlackList(filteredPatternTree)) {
                    result.appendFullPath(path);
                }
            }
            catch (MetadataException e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
                resp.setStatus(RpcUtils.getStatus((int)e.getErrorCode(), (String)e.getMessage()));
                return resp;
            }
        }
        resp.setStatus(RpcUtils.SUCCESS_STATUS);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
        result.constructTree();
        try {
            result.serialize(dataOutputStream);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        resp.setPathPatternTree(outputStream.toByteArray());
        return resp;
    }

    public TSStatus deleteDataForDeleteTimeSeries(TDeleteDataForDeleteTimeSeriesReq req) throws TException {
        PathPatternTree patternTree = PathPatternTree.deserialize((ByteBuffer)ByteBuffer.wrap(req.getPathPatternTree()));
        List pathList = patternTree.getAllPathPatterns();
        ArrayList<TSStatus> failureList = new ArrayList<TSStatus>();
        for (TConsensusGroupId consensusGroupId : req.getDataRegionIdList()) {
            RegionWriteExecutor executor = new RegionWriteExecutor();
            TSStatus status = executor.execute((ConsensusGroupId)new DataRegionId(consensusGroupId.getId()), new DeleteDataNode(new PlanNodeId(""), pathList, Long.MIN_VALUE, Long.MAX_VALUE)).getStatus();
            if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) continue;
            failureList.add(status);
        }
        if (!failureList.isEmpty()) {
            return RpcUtils.getStatus(failureList);
        }
        return RpcUtils.SUCCESS_STATUS;
    }

    public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
        PathPatternTree patternTree = PathPatternTree.deserialize((ByteBuffer)ByteBuffer.wrap(req.getPathPatternTree()));
        ArrayList<TSStatus> failureList = new ArrayList<TSStatus>();
        for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
            String storageGroup = this.schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId())).getStorageGroupFullPath();
            PathPatternTree filteredPatternTree = this.filterPathPatternTree(patternTree, storageGroup);
            if (filteredPatternTree.isEmpty()) continue;
            RegionWriteExecutor executor = new RegionWriteExecutor();
            TSStatus status = executor.execute((ConsensusGroupId)new SchemaRegionId(consensusGroupId.getId()), new DeleteTimeSeriesNode(new PlanNodeId(""), filteredPatternTree)).getStatus();
            if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) continue;
            failureList.add(status);
        }
        if (!failureList.isEmpty()) {
            return RpcUtils.getStatus(failureList);
        }
        return RpcUtils.SUCCESS_STATUS;
    }

    public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) throws TException {
        try {
            PipeInfo pipeInfo = PipeInfo.deserializePipeInfo((ByteBuffer)req.pipeInfo);
            SyncService.getInstance().addPipe(pipeInfo);
            return RpcUtils.SUCCESS_STATUS;
        }
        catch (PipeException e) {
            return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) throws TException {
        try {
            switch (SyncOperation.values()[req.getOperation()]) {
                case START_PIPE: {
                    SyncService.getInstance().startPipe(req.getPipeName());
                    break;
                }
                case STOP_PIPE: {
                    SyncService.getInstance().stopPipe(req.getPipeName());
                    break;
                }
                case DROP_PIPE: {
                    SyncService.getInstance().dropPipe(req.getPipeName());
                    break;
                }
                default: {
                    return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage("Unsupported operation.");
                }
            }
            return RpcUtils.SUCCESS_STATUS;
        }
        catch (PipeException e) {
            return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
        PathPatternTree filteredPatternTree = new PathPatternTree();
        try {
            PartialPath storageGroupPattern = new PartialPath(storageGroup).concatNode("**");
            for (PartialPath pathPattern : patternTree.getOverlappedPathPatterns(storageGroupPattern)) {
                filteredPatternTree.appendPathPattern(pathPattern);
            }
            filteredPatternTree.constructTree();
        }
        catch (IllegalPathException illegalPathException) {
            // empty catch block
        }
        return filteredPatternTree;
    }

    public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
        THeartbeatResp resp = new THeartbeatResp();
        if (req.isNeedJudgeLeader()) {
            resp.setJudgedLeaders(this.getJudgedLeaders());
        }
        if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric().booleanValue() && req.isNeedSamplingLoad()) {
            TLoadSample loadSample = new TLoadSample();
            long cpuLoad = MetricService.getInstance().getOrCreateGauge(Metric.SYS_CPU_LOAD.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).value();
            if (cpuLoad != 0L) {
                loadSample.setCpuUsageRate((short)cpuLoad);
            }
            long usedMemory = this.getMemory("jvm.memory.used.bytes");
            long maxMemory = this.getMemory("jvm.memory.max.bytes");
            if (usedMemory != 0L && maxMemory != 0L) {
                loadSample.setMemoryUsageRate((double)usedMemory * 100.0 / (double)maxMemory);
            }
            this.sampleDiskLoad(loadSample);
            resp.setLoadSample(loadSample);
        }
        resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
        resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
        if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
            resp.setStatusReason(CommonDescriptor.getInstance().getConfig().getStatusReason());
        }
        return resp;
    }

    public TSStatus updateRegionCache(TRegionRouteReq req) throws TException {
        boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req);
        if (result) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.CACHE_UPDATE_FAIL);
    }

    private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
        HashMap<TConsensusGroupId, Boolean> result = new HashMap<TConsensusGroupId, Boolean>();
        if (DataRegionConsensusImpl.getInstance() != null) {
            DataRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), DataRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        if (SchemaRegionConsensusImpl.getInstance() != null) {
            SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        return result;
    }

    private long getMemory(String gaugeName) {
        long result = 0L;
        try {
            Gauge gauge;
            List<String> heapIds = Arrays.asList("PS Eden Space", "PS Old Eden", "Ps Survivor Space");
            List<String> noHeapIds = Arrays.asList("Code Cache", "Compressed Class Space", "Metaspace");
            for (String id : heapIds) {
                gauge = MetricService.getInstance().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "heap"});
                result += gauge.value();
            }
            for (String id : noHeapIds) {
                gauge = MetricService.getInstance().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "noheap"});
                result += gauge.value();
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to get memory from metric because {}", (Object)e.getMessage());
            return 0L;
        }
        return result;
    }

    private void sampleDiskLoad(TLoadSample loadSample) {
        CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
        long freeDisk = MetricService.getInstance().getOrCreateGauge(Metric.SYS_DISK_FREE_SPACE.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).value();
        long totalDisk = MetricService.getInstance().getOrCreateGauge(Metric.SYS_DISK_TOTAL_SPACE.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).value();
        if (freeDisk != 0L && totalDisk != 0L) {
            double freeDiskRatio = (double)freeDisk * 100.0 / (double)totalDisk;
            loadSample.setDiskUsageRate(100.0 - freeDiskRatio);
            if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) {
                commonConfig.setNodeStatus(NodeStatus.ReadOnly);
                commonConfig.setStatusReason("DiskFull");
            }
        }
    }

    public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) {
        if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
    }

    public TSStatus merge() throws TException {
        try {
            this.storageEngine.mergeAll();
        }
        catch (StorageEngineException e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus flush(TFlushReq req) throws TException {
        return this.storageEngine.operateFlush(req);
    }

    public TSStatus clearCache() throws TException {
        ChunkCache.getInstance().clear();
        TimeSeriesMetadataCache.getInstance().clear();
        BloomFilterCache.getInstance().clear();
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus loadConfiguration() throws TException {
        try {
            IoTDBDescriptor.getInstance().loadHotModifiedProps();
        }
        catch (QueryProcessException e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus setSystemStatus(String status) throws TException {
        try {
            CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse((String)status));
        }
        catch (Exception e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus setTTL(TSetTTLReq req) throws TException {
        return this.storageEngine.setTTL(req);
    }

    public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
        List configNodeLocations = req.getConfigNodeLocations();
        if (configNodeLocations != null) {
            ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeLocations.parallelStream().map(TConfigNodeLocation::getInternalEndPoint).collect(Collectors.toList()));
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus updateTemplate(TUpdateTemplateReq req) throws TException {
        switch (TemplateInternalRPCUpdateType.getType(req.type)) {
            case ADD_TEMPLATE_SET_INFO: {
                ClusterTemplateManager.getInstance().updateTemplateSetInfo(req.getTemplateInfo());
                break;
            }
            case INVALIDATE_TEMPLATE_SET_INFO: {
                ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
            }
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
        ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)tconsensusGroupId);
        if (consensusGroupId instanceof DataRegionId) {
            ConsensusGenericResponse response = DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
            if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
            }
            return this.regionManager.deleteDataRegion((DataRegionId)consensusGroupId);
        }
        ConsensusGenericResponse response = SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
        if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
        }
        return this.regionManager.deleteSchemaRegion((SchemaRegionId)consensusGroupId);
    }

    public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) throws TException {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        TConsensusGroupId tgId = req.getRegionId();
        ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)tgId);
        TEndPoint newNode = this.getConsensusEndPoint(req.getNewLeaderNode(), regionId);
        Peer newLeaderPeer = new Peer(regionId, req.getNewLeaderNode().getDataNodeId(), newNode);
        if (!this.isLeader(regionId)) {
            LOGGER.info("region {} is not leader, no need to change leader", (Object)regionId);
            return status;
        }
        LOGGER.info("region {} is leader, will change leader", (Object)regionId);
        return this.transferLeader(regionId, newLeaderPeer);
    }

    private TSStatus transferLeader(ConsensusGroupId regionId, Peer newLeaderPeer) {
        ConsensusGenericResponse resp;
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (regionId instanceof DataRegionId) {
            resp = DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
        } else if (regionId instanceof SchemaRegionId) {
            resp = SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
        } else {
            status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
            status.setMessage("Error Region type. region: " + regionId);
            return status;
        }
        if (!resp.isSuccess()) {
            LOGGER.error("change region {} leader failed", (Object)regionId, (Object)resp.getException());
            status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
            status.setMessage(resp.getException().getMessage());
            return status;
        }
        status.setMessage("change region " + regionId + " leader succeed");
        return status;
    }

    private boolean isLeader(ConsensusGroupId regionId) {
        if (regionId instanceof DataRegionId) {
            return DataRegionConsensusImpl.getInstance().isLeader(regionId);
        }
        if (regionId instanceof SchemaRegionId) {
            return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
        }
        LOGGER.error("region {} type is illegal", (Object)regionId);
        return false;
    }

    public TSStatus createNewRegionPeer(TCreatePeerReq req) throws TException {
        ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getRegionId());
        List<Peer> peers = req.getRegionLocations().stream().map(location -> new Peer(regionId, location.getDataNodeId(), this.getConsensusEndPoint((TDataNodeLocation)location, regionId))).collect(Collectors.toList());
        TSStatus status = this.createNewRegion(regionId, req.getStorageGroup(), req.getTtl());
        if (!this.isSucceed(status)) {
            return status;
        }
        return this.createNewRegionPeer(regionId, peers);
    }

    public TSStatus addRegionPeer(TMaintainPeerReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("Successfully submit addRegionPeer task for region: {} on DataNode: {}", (Object)regionId, (Object)selectedDataNodeIP);
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("Submit addRegionPeer task failed, region: " + regionId);
        return status;
    }

    public TSStatus removeRegionPeer(TMaintainPeerReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("Successfully submit removeRegionPeer task for region: {} on DataNode: {}", (Object)regionId, (Object)selectedDataNodeIP);
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("Submit removeRegionPeer task failed, region: " + regionId);
        return status;
    }

    public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("Successfully submit deleteOldRegionPeer task for region: {} on DataNode: {}", (Object)regionId, (Object)selectedDataNodeIP);
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("Submit deleteOldRegionPeer task failed, region: " + regionId);
        return status;
    }

    private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
        return this.regionManager.createNewRegion(regionId, storageGroup, ttl);
    }

    public TSStatus createFunction(TCreateFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().register(request.getUdfName(), request.getClassName(), request.getUris(), UDFExecutableManager.getInstance(), true);
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus dropFunction(TDropFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().deregister(request.getUdfName());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws TException {
        TriggerInformation triggerInformation = TriggerInformation.deserialize((ByteBuffer)req.triggerInformation);
        try {
            if (req.getJarFile() != null) {
                TriggerExecutableManager.getInstance().writeToLibDir(req.jarFile, triggerInformation.getJarName());
            }
            TriggerManagementService.getInstance().register(triggerInformation);
        }
        catch (Exception e) {
            LOGGER.warn("Error occurred when creating trigger instance for trigger: {}. The cause is {}.", (Object)triggerInformation.getTriggerName(), (Object)e);
            return new TSStatus(TSStatusCode.CREATE_TRIGGER_INSTANCE_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) throws TException {
        try {
            TriggerManagementService.getInstance().activeTrigger(req.triggerName);
        }
        catch (Exception e) {
            LOGGER.error("Error occurred during active trigger instance for trigger: {}. The cause is {}.", (Object)req.triggerName, (Object)e);
            return new TSStatus(TSStatusCode.ACTIVE_TRIGGER_INSTANCE_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req) throws TException {
        try {
            TriggerManagementService.getInstance().inactiveTrigger(req.triggerName);
        }
        catch (Exception e) {
            LOGGER.error("Error occurred during ");
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) throws TException {
        try {
            TriggerManagementService.getInstance().dropTrigger(req.triggerName, req.needToDeleteJarFile);
        }
        catch (Exception e) {
            LOGGER.error("Error occurred during drop trigger instance for trigger: {}. The cause is {}.", (Object)req.triggerName, (Object)e);
            return new TSStatus(TSStatusCode.DROP_TRIGGER_INSTANCE_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    private TEndPoint getConsensusEndPoint(TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
        if (regionId instanceof DataRegionId) {
            return nodeLocation.getDataRegionConsensusEndPoint();
        }
        return nodeLocation.getSchemaRegionConsensusEndPoint();
    }

    private boolean isSucceed(TSStatus status) {
        return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
    }

    private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List<Peer> peers) {
        LOGGER.info("Start to createNewRegionPeer {} to region {}", peers, (Object)regionId);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        ConsensusGenericResponse resp = regionId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().createPeer(regionId, peers) : SchemaRegionConsensusImpl.getInstance().createPeer(regionId, peers);
        if (!resp.isSuccess()) {
            LOGGER.error("CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage", new Object[]{peers, regionId, resp.getException()});
            status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
            status.setMessage(resp.getException().getMessage());
            return status;
        }
        LOGGER.info("Succeed to createNewRegionPeer {} for region {}", peers, (Object)regionId);
        status.setMessage("createNewRegionPeer succeed, regionId: " + regionId);
        return status;
    }

    public TSStatus disableDataNode(TDisableDataNodeReq req) throws TException {
        LOGGER.info("start disable data node in the request: {}", (Object)req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        status.setMessage("disable datanode succeed");
        ClusterPartitionFetcher.getInstance().invalidAllCache();
        DataNodeSchemaCache.getInstance().cleanUp();
        return status;
    }

    public TSStatus stopDataNode() {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        LOGGER.info("Execute stopDataNode RPC method");
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(20L);
            }
            catch (InterruptedException e) {
                LOGGER.error("Meets InterruptedException in stopDataNode RPC method");
            }
            finally {
                LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds");
                System.exit(0);
            }
        }).start();
        try {
            DataNode.getInstance().stop();
            status.setMessage("stop datanode succeed");
        }
        catch (Exception e) {
            LOGGER.error("stop Data Node error", (Throwable)e);
            status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
            status.setMessage(e.getMessage());
        }
        return status;
    }

    public void handleClientExit() {
    }
}

