/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.service.thrift.impl;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.SetThreadName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataNodeInternalRPCServiceImpl
implements IDataNodeRPCService.Iface {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
    private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
    private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();

    public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
        TSendFragmentInstanceResp tSendFragmentInstanceResp;
        FragmentInstance fragmentInstance;
        ConsensusGroupId groupId;
        LOGGER.info("receive FragmentInstance to group[{}]", (Object)req.getConsensusGroupId());
        try {
            groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        }
        catch (Throwable t) {
            LOGGER.error("Deserialize ConsensusGroupId failed. ", t);
            TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
            resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
            return resp;
        }
        try {
            fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
        }
        catch (Throwable t) {
            LOGGER.error("Deserialize FragmentInstance failed.", t);
            TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
            resp.setMessage("Deserialize FragmentInstance failed: " + t.getMessage());
            return resp;
        }
        SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId(), new Object[0]);
        try {
            ConsensusReadResponse readResponse = groupId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().read(groupId, (IConsensusRequest)fragmentInstance) : SchemaRegionConsensusImpl.getInstance().read(groupId, (IConsensusRequest)fragmentInstance);
            TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
            if (!readResponse.isSuccess()) {
                LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", (Object)req.getConsensusGroupId(), (Object)readResponse.getException());
                resp.setAccepted(false);
                resp.setMessage("Execute FragmentInstance failed: " + (readResponse.getException() == null ? "" : readResponse.getException().getMessage()));
            } else {
                FragmentInstanceInfo info = (FragmentInstanceInfo)readResponse.getDataset();
                resp.setAccepted(!info.getState().isFailed());
                resp.setMessage(info.getMessage());
            }
            tSendFragmentInstanceResp = resp;
        }
        catch (Throwable resp) {
            try {
                try {
                    threadName.close();
                }
                catch (Throwable throwable) {
                    resp.addSuppressed(throwable);
                }
                throw resp;
            }
            catch (Throwable t) {
                LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", (Object)req.getConsensusGroupId(), (Object)t);
                TSendFragmentInstanceResp resp2 = new TSendFragmentInstanceResp(false);
                resp2.setMessage("Execute FragmentInstance failed: " + t.getMessage());
                return resp2;
            }
        }
        threadName.close();
        return tSendFragmentInstanceResp;
    }

    public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
        ConsensusWriteResponse writeResponse;
        LOGGER.info("receive PlanNode to group[{}]", (Object)req.getConsensusGroupId());
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        TSendPlanNodeResp response = new TSendPlanNodeResp();
        PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
        boolean hasFailedMeasurement = false;
        String partialInsertMessage = null;
        if (planNode instanceof InsertNode) {
            InsertNode insertNode = (InsertNode)planNode;
            try {
                SchemaValidator.validate(insertNode);
            }
            catch (SemanticException e) {
                response.setAccepted(false);
                response.setMessage(e.getMessage());
                return response;
            }
            hasFailedMeasurement = insertNode.hasFailedMeasurements();
            if (hasFailedMeasurement) {
                partialInsertMessage = String.format("Fail to insert measurements %s caused by %s", insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
                LOGGER.warn(partialInsertMessage);
            }
        }
        if ((writeResponse = groupId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().write(groupId, (IConsensusRequest)planNode) : SchemaRegionConsensusImpl.getInstance().write(groupId, (IConsensusRequest)planNode)).getStatus() != null) {
            response.setAccepted(!hasFailedMeasurement && TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
            if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
                response.setMessage(writeResponse.getStatus().message);
                response.setStatus(writeResponse.getStatus());
            } else if (hasFailedMeasurement) {
                response.setMessage(partialInsertMessage);
                response.setStatus(RpcUtils.getStatus((int)TSStatusCode.METADATA_ERROR.getStatusCode(), (String)partialInsertMessage));
            } else {
                response.setMessage(writeResponse.getStatus().message);
            }
        } else {
            LOGGER.error("Something wrong happened while calling consensus layer's write API.", (Throwable)writeResponse.getException());
            response.setAccepted(false);
            response.setMessage(writeResponse.getException().getMessage());
        }
        return response;
    }

    public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
        FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
        FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
        if (info != null) {
            TFragmentInstanceStateResp resp = new TFragmentInstanceStateResp(info.getState().toString());
            resp.setFailedMessages((List)ImmutableList.of((Object)info.getMessage()));
            return resp;
        }
        return new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
    }

    public TCancelResp cancelQuery(TCancelQueryReq req) {
        try (SetThreadName threadName = new SetThreadName(req.getQueryId(), new Object[0]);){
            List taskIds = req.getFragmentInstanceIds().stream().map(FragmentInstanceId::fromThrift).collect(Collectors.toList());
            for (FragmentInstanceId taskId : taskIds) {
                FragmentInstanceManager.getInstance().cancelTask(taskId);
            }
            TCancelResp tCancelResp = new TCancelResp(true);
            return tCancelResp;
        }
    }

    public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
        throw new NotImplementedException();
    }

    public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
        throw new NotImplementedException();
    }

    public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
        throw new UnsupportedOperationException();
    }

    public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
        TSStatus tsStatus;
        try {
            PartialPath storageGroupPartitionPath = new PartialPath(req.getStorageGroup());
            TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
            SchemaRegionId schemaRegionId = new SchemaRegionId(regionReplicaSet.getRegionId().getId());
            this.schemaEngine.createSchemaRegion(storageGroupPartitionPath, schemaRegionId);
            ArrayList<Peer> peers = new ArrayList<Peer>();
            for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                TEndPoint endpoint = new TEndPoint(dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(), dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
                peers.add(new Peer((ConsensusGroupId)schemaRegionId, endpoint));
            }
            ConsensusGenericResponse consensusGenericResponse = SchemaRegionConsensusImpl.getInstance().createPeer((ConsensusGroupId)schemaRegionId, peers);
            if (consensusGenericResponse.isSuccess()) {
                tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            } else {
                tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
                tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
            }
        }
        catch (IllegalPathException e1) {
            LOGGER.error("Create Schema Region {} failed because path is illegal.", (Object)req.getStorageGroup());
            tsStatus = new TSStatus(TSStatusCode.PATH_ILLEGAL.getStatusCode());
            tsStatus.setMessage("Create Schema Region failed because storageGroup path is illegal.");
        }
        catch (MetadataException e2) {
            LOGGER.error("Create Schema Region {} failed because {}", (Object)req.getStorageGroup(), (Object)e2.getMessage());
            tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
            tsStatus.setMessage(String.format("Create Schema Region failed because of %s", e2.getMessage()));
        }
        return tsStatus;
    }

    public TSStatus createDataRegion(TCreateDataRegionReq req) {
        TSStatus tsStatus;
        try {
            TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
            DataRegionId dataRegionId = new DataRegionId(regionReplicaSet.getRegionId().getId());
            this.storageEngine.createDataRegion(dataRegionId, req.storageGroup, req.ttl);
            ArrayList<Peer> peers = new ArrayList<Peer>();
            for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                TEndPoint endpoint = new TEndPoint(dataNodeLocation.getDataRegionConsensusEndPoint().getIp(), dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
                peers.add(new Peer((ConsensusGroupId)dataRegionId, endpoint));
            }
            ConsensusGenericResponse consensusGenericResponse = DataRegionConsensusImpl.getInstance().createPeer((ConsensusGroupId)dataRegionId, peers);
            if (consensusGenericResponse.isSuccess()) {
                tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            } else {
                tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
                tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
            }
        }
        catch (DataRegionException e) {
            LOGGER.error("Create Data Region {} failed because {}", (Object)req.getStorageGroup(), (Object)e.getMessage());
            tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
            tsStatus.setMessage(String.format("Create Data Region failed because of %s", e.getMessage()));
        }
        return tsStatus;
    }

    public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
        ClusterPartitionFetcher.getInstance().invalidAllCache();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
        DataNodeSchemaCache.getInstance().cleanUp();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
        THeartbeatResp resp = new THeartbeatResp();
        resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
        resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
        if (req.isNeedJudgeLeader()) {
            resp.setJudgedLeaders(this.getJudgedLeaders());
        }
        if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric().booleanValue() && req.isNeedSamplingLoad()) {
            long cpuLoad = MetricService.getInstance().getOrCreateGauge(Metric.SYS_CPU_LOAD.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).value();
            if (cpuLoad != 0L) {
                resp.setCpu((short)cpuLoad);
            }
            long usedMemory = this.getMemory("jvm.memory.used.bytes");
            long maxMemory = this.getMemory("jvm.memory.max.bytes");
            if (usedMemory != 0L && maxMemory != 0L) {
                resp.setMemory((short)(usedMemory * 100L / maxMemory));
            }
        }
        return resp;
    }

    public TSStatus updateRegionCache(TRegionRouteReq req) throws TException {
        boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req);
        if (result) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.CACHE_UPDATE_FAIL);
    }

    private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
        HashMap<TConsensusGroupId, Boolean> result = new HashMap<TConsensusGroupId, Boolean>();
        if (DataRegionConsensusImpl.getInstance() != null) {
            DataRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), DataRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        if (SchemaRegionConsensusImpl.getInstance() != null) {
            SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        return result;
    }

    private long getMemory(String gaugeName) {
        long result = 0L;
        try {
            Gauge gauge;
            List<String> heapIds = Arrays.asList("PS Eden Space", "PS Old Eden", "Ps Survivor Space");
            List<String> noHeapIds = Arrays.asList("Code Cache", "Compressed Class Space", "Metaspace");
            for (String id : heapIds) {
                gauge = MetricService.getInstance().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "heap"});
                result += gauge.value();
            }
            for (String id : noHeapIds) {
                gauge = MetricService.getInstance().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "noheap"});
                result += gauge.value();
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to get memory from metric because {}", (Object)e.getMessage());
            return 0L;
        }
        return result;
    }

    public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) {
        if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
    }

    public TSStatus merge() throws TException {
        try {
            this.storageEngine.mergeAll();
        }
        catch (StorageEngineException e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus flush(TFlushReq req) throws TException {
        return this.storageEngine.operateFlush(req);
    }

    public TSStatus clearCache() throws TException {
        ChunkCache.getInstance().clear();
        TimeSeriesMetadataCache.getInstance().clear();
        BloomFilterCache.getInstance().clear();
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus loadConfiguration() throws TException {
        try {
            IoTDBDescriptor.getInstance().loadHotModifiedProps();
        }
        catch (QueryProcessException e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus setSystemStatus(String status) throws TException {
        try {
            CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse((String)status));
        }
        catch (Exception e) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.EXECUTE_STATEMENT_ERROR, (String)e.getMessage());
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus setTTL(TSetTTLReq req) throws TException {
        return this.storageEngine.setTTL(req);
    }

    public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
        List configNodeLocations = req.getConfigNodeLocations();
        if (configNodeLocations != null) {
            ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeLocations.parallelStream().map(TConfigNodeLocation::getInternalEndPoint).collect(Collectors.toList()));
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus updateTemplate(TUpdateTemplateReq req) throws TException {
        switch (TemplateInternalRPCUpdateType.getType(req.type)) {
            case ADD_TEMPLATE_SET_INFO: {
                ClusterTemplateManager.getInstance().updateTemplateSetInfo(req.getTemplateInfo());
                break;
            }
            case INVALIDATE_TEMPLATE_SET_INFO: {
                ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
            }
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
    }

    public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
        ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)tconsensusGroupId);
        if (consensusGroupId instanceof DataRegionId) {
            ConsensusGenericResponse response = DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
            if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
            }
            StorageEngineV2.getInstance().deleteDataRegion((DataRegionId)consensusGroupId);
        } else {
            ConsensusGenericResponse response = SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
            if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
            }
            try {
                SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId)consensusGroupId);
            }
            catch (MetadataException e) {
                LOGGER.error("{}: MetaData error: ", (Object)"IoTDB", (Object)e);
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.METADATA_ERROR, (String)e.getMessage());
            }
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS, (String)"Execute successfully");
    }

    public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) throws TException {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        TConsensusGroupId tgId = req.getRegionId();
        ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)tgId);
        TEndPoint newNode = this.getConsensusEndPoint(req.getNewLeaderNode(), regionId);
        Peer newLeaderPeer = new Peer(regionId, newNode);
        if (!this.isLeader(regionId)) {
            LOGGER.info("region {} is not leader, no need to change leader", (Object)regionId);
            return status;
        }
        LOGGER.info("region {} is leader, will change leader", (Object)regionId);
        return this.transferLeader(regionId, newLeaderPeer);
    }

    private TSStatus transferLeader(ConsensusGroupId regionId, Peer newLeaderPeer) {
        ConsensusGenericResponse resp;
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (regionId instanceof DataRegionId) {
            resp = DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
        } else if (regionId instanceof SchemaRegionId) {
            resp = SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
        } else {
            status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
            status.setMessage("Error Region type. region: " + regionId);
            return status;
        }
        if (!resp.isSuccess()) {
            LOGGER.error("change region {} leader failed", (Object)regionId, (Object)resp.getException());
            status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
            status.setMessage(resp.getException().getMessage());
            return status;
        }
        status.setMessage("change region " + regionId + " leader succeed");
        return status;
    }

    private boolean isLeader(ConsensusGroupId regionId) {
        if (regionId instanceof DataRegionId) {
            return DataRegionConsensusImpl.getInstance().isLeader(regionId);
        }
        if (regionId instanceof SchemaRegionId) {
            return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
        }
        LOGGER.error("region {} type is illegal", (Object)regionId);
        return false;
    }

    public TSStatus createPeerToConsensusGroup(TCreatePeerReq req) throws TException {
        ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getRegionId());
        List<Peer> peers = req.getRegionLocations().stream().map(n -> this.getConsensusEndPoint((TDataNodeLocation)n, regionId)).map(node -> new Peer(regionId, node)).collect(Collectors.toList());
        TSStatus status = this.createNewRegion(regionId, req.getStorageGroup(), req.getTtl());
        if (!this.isSucceed(status)) {
            return status;
        }
        return this.addConsensusGroup(regionId, peers);
    }

    public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("succeed to submit a remove region peer task. region: {}, from {}", (Object)regionId, (Object)req.getFromNode().getInternalEndPoint());
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("submit region remove region peer task failed, region: " + regionId + ", from " + req.getFromNode().getInternalEndPoint());
        return status;
    }

    public TSStatus deletePeerToConsensusGroup(TMigrateRegionReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionConsensusGroupTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("succeed to submit a remove region consensus group task. region: {}, from {}", (Object)regionId, (Object)fromNodeIp);
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("submit region remove region consensus group task failed, region: " + regionId);
        return status;
    }

    private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        LOGGER.info("start to create new region {}", (Object)regionId);
        try {
            if (regionId instanceof DataRegionId) {
                this.storageEngine.createDataRegion((DataRegionId)regionId, storageGroup, ttl);
            } else {
                this.schemaEngine.createSchemaRegion(new PartialPath(storageGroup), (SchemaRegionId)regionId);
            }
        }
        catch (Exception e) {
            LOGGER.error("create new region {} error", (Object)regionId, (Object)e);
            status.setCode(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
            status.setMessage("create new region " + regionId + "error,  exception:" + e.getMessage());
            return status;
        }
        status.setMessage("create new region " + regionId + " succeed");
        LOGGER.info("succeed to create new region {}", (Object)regionId);
        return status;
    }

    public TSStatus createFunction(TCreateFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().register(request.getUdfName(), request.getClassName(), request.getUris(), UDFExecutableManager.getInstance(), true);
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus dropFunction(TDropFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().deregister(request.getUdfName());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws TException {
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) throws TException {
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus addRegionPeer(TMigrateRegionReq req) throws TException {
        TConsensusGroupId regionId = req.getRegionId();
        String toNodeIp = req.getToNode().getInternalEndPoint().getIp();
        boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        if (submitSucceed) {
            LOGGER.info("succeed to submit a add region peer task. region: {}, to {}", (Object)regionId, (Object)toNodeIp);
            return status;
        }
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
        status.setMessage("submit add region peer task failed, region: " + regionId);
        return status;
    }

    private TEndPoint getConsensusEndPoint(TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
        if (regionId instanceof DataRegionId) {
            return nodeLocation.getDataRegionConsensusEndPoint();
        }
        return nodeLocation.getSchemaRegionConsensusEndPoint();
    }

    private boolean isSucceed(TSStatus status) {
        return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
    }

    private TSStatus addConsensusGroup(ConsensusGroupId regionId, List<Peer> peers) {
        LOGGER.info("Start to add consensus group {} to region {}", peers, (Object)regionId);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        ConsensusGenericResponse resp = regionId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().createPeer(regionId, peers) : SchemaRegionConsensusImpl.getInstance().createPeer(regionId, peers);
        if (!resp.isSuccess()) {
            LOGGER.error("add peers {} to region {} consensus group error", new Object[]{peers, regionId, resp.getException()});
            status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
            status.setMessage(resp.getException().getMessage());
            return status;
        }
        LOGGER.info("succeed to add peers {} to region {} consensus group", peers, (Object)regionId);
        status.setMessage("add peers to region consensus group " + regionId + "succeed");
        return status;
    }

    public TSStatus disableDataNode(TDisableDataNodeReq req) throws TException {
        LOGGER.info("start disable data node in the request: {}", (Object)req);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        status.setMessage("disable datanode succeed");
        ClusterPartitionFetcher.getInstance().invalidAllCache();
        DataNodeSchemaCache.getInstance().cleanUp();
        return status;
    }

    public TSStatus stopDataNode() {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        LOGGER.info("stopping Data Node");
        try {
            DataNode.getInstance().stop();
            status.setMessage("stop datanode succeed");
        }
        catch (Exception e) {
            LOGGER.error("stop Data Node error", (Throwable)e);
            status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
            status.setMessage(e.getMessage());
        }
        return status;
    }

    public void handleClientExit() {
    }
}

