/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.service.thrift.impl;

import io.airlift.concurrent.SetThreadName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionResp;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataNodeRPCServiceImpl
implements IDataNodeRPCService.Iface {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRPCServiceImpl.class);
    private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
    private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
    private static final double loadBalanceThreshold = 0.1;

    public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
        LOGGER.info("receive FragmentInstance to group[{}]", (Object)req.getConsensusGroupId());
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        FragmentInstance fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
        ConsensusReadResponse readResponse = groupId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().read(groupId, (IConsensusRequest)fragmentInstance) : SchemaRegionConsensusImpl.getInstance().read(groupId, (IConsensusRequest)fragmentInstance);
        if (!readResponse.isSuccess()) {
            LOGGER.error("execute FragmentInstance in ConsensusGroup {} failed because {}", (Object)req.getConsensusGroupId(), (Object)readResponse.getException());
            return new TSendFragmentInstanceResp(false);
        }
        FragmentInstanceInfo info = (FragmentInstanceInfo)readResponse.getDataset();
        return new TSendFragmentInstanceResp(!info.getState().isFailed());
    }

    public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
        ConsensusWriteResponse writeResponse;
        LOGGER.info("receive PlanNode to group[{}]", (Object)req.getConsensusGroupId());
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        TSendPlanNodeResp response = new TSendPlanNodeResp();
        PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
        boolean hasFailedMeasurement = false;
        if (planNode instanceof InsertNode) {
            InsertNode insertNode = (InsertNode)planNode;
            try {
                SchemaValidator.validate(insertNode);
            }
            catch (SemanticException e) {
                response.setAccepted(false);
                response.setMessage(e.getMessage());
                return response;
            }
            hasFailedMeasurement = insertNode.hasFailedMeasurements();
            if (hasFailedMeasurement) {
                LOGGER.warn("Fail to insert measurements {} caused by {}", insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
            }
        }
        if ((writeResponse = groupId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().write(groupId, (IConsensusRequest)planNode) : SchemaRegionConsensusImpl.getInstance().write(groupId, (IConsensusRequest)planNode)).getStatus() != null) {
            response.setAccepted(!hasFailedMeasurement && TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
            response.setMessage(writeResponse.getStatus().message);
        } else {
            LOGGER.error("Something wrong happened while calling consensus layer's write API.", (Throwable)writeResponse.getException());
            response.setAccepted(false);
            response.setMessage(writeResponse.getException().getMessage());
        }
        return response;
    }

    public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
        FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
        try (SetThreadName threadName = new SetThreadName(instanceId.getFullId(), new Object[0]);){
            FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
            TFragmentInstanceStateResp tFragmentInstanceStateResp = info != null ? new TFragmentInstanceStateResp(info.getState().toString()) : new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
            return tFragmentInstanceStateResp;
        }
    }

    public TCancelResp cancelQuery(TCancelQueryReq req) {
        try (SetThreadName threadName = new SetThreadName(req.getQueryId(), new Object[0]);){
            LOGGER.info("start cancelling query.");
            List taskIds = req.getFragmentInstanceIds().stream().map(FragmentInstanceId::fromThrift).collect(Collectors.toList());
            for (FragmentInstanceId taskId : taskIds) {
                FragmentInstanceManager.getInstance().cancelTask(taskId);
            }
            LOGGER.info("finish cancelling query.");
            TCancelResp tCancelResp = new TCancelResp(true);
            return tCancelResp;
        }
    }

    public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
        throw new NotImplementedException();
    }

    public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
        throw new NotImplementedException();
    }

    public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
        throw new UnsupportedOperationException();
    }

    public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
        TSStatus tsStatus;
        try {
            PartialPath storageGroupPartitionPath = new PartialPath(req.getStorageGroup());
            TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
            SchemaRegionId schemaRegionId = new SchemaRegionId(regionReplicaSet.getRegionId().getId());
            this.schemaEngine.createSchemaRegion(storageGroupPartitionPath, schemaRegionId);
            ArrayList<Peer> peers = new ArrayList<Peer>();
            for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                TEndPoint endpoint = new TEndPoint(dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(), dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
                peers.add(new Peer((ConsensusGroupId)schemaRegionId, endpoint));
            }
            ConsensusGenericResponse consensusGenericResponse = SchemaRegionConsensusImpl.getInstance().addConsensusGroup((ConsensusGroupId)schemaRegionId, peers);
            if (consensusGenericResponse.isSuccess()) {
                tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            } else {
                tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
                tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
            }
        }
        catch (IllegalPathException e1) {
            LOGGER.error("Create Schema Region {} failed because path is illegal.", (Object)req.getStorageGroup());
            tsStatus = new TSStatus(TSStatusCode.PATH_ILLEGAL.getStatusCode());
            tsStatus.setMessage("Create Schema Region failed because storageGroup path is illegal.");
        }
        catch (MetadataException e2) {
            LOGGER.error("Create Schema Region {} failed because {}", (Object)req.getStorageGroup(), (Object)e2.getMessage());
            tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
            tsStatus.setMessage(String.format("Create Schema Region failed because of %s", e2.getMessage()));
        }
        return tsStatus;
    }

    public TSStatus createDataRegion(TCreateDataRegionReq req) {
        TSStatus tsStatus;
        try {
            TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
            DataRegionId dataRegionId = new DataRegionId(regionReplicaSet.getRegionId().getId());
            this.storageEngine.createDataRegion(dataRegionId, req.storageGroup, req.ttl);
            ArrayList<Peer> peers = new ArrayList<Peer>();
            for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                TEndPoint endpoint = new TEndPoint(dataNodeLocation.getDataRegionConsensusEndPoint().getIp(), dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
                peers.add(new Peer((ConsensusGroupId)dataRegionId, endpoint));
            }
            ConsensusGenericResponse consensusGenericResponse = DataRegionConsensusImpl.getInstance().addConsensusGroup((ConsensusGroupId)dataRegionId, peers);
            if (consensusGenericResponse.isSuccess()) {
                tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            } else {
                tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
                tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
            }
        }
        catch (DataRegionException e) {
            LOGGER.error("Create Data Region {} failed because {}", (Object)req.getStorageGroup(), (Object)e.getMessage());
            tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
            tsStatus.setMessage(String.format("Create Data Region failed because of %s", e.getMessage()));
        }
        return tsStatus;
    }

    public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
        ClusterPartitionFetcher.getInstance().invalidAllCache();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
        DataNodeSchemaCache.getInstance().cleanUp();
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
    }

    public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
        THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp(), this.getJudgedLeaders());
        Random whetherToGetMetric = new Random();
        if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric().booleanValue() && whetherToGetMetric.nextDouble() < 0.1) {
            long cpuLoad = MetricsService.getInstance().getMetricManager().getOrCreateGauge(Metric.SYS_CPU_LOAD.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).value();
            if (cpuLoad != 0L) {
                resp.setCpu((short)cpuLoad);
            }
            long usedMemory = this.getMemory("jvm.memory.used.bytes");
            long maxMemory = this.getMemory("jvm.memory.max.bytes");
            if (usedMemory != 0L && maxMemory != 0L) {
                resp.setMemory((short)(usedMemory * 100L / maxMemory));
            }
        }
        return resp;
    }

    private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
        HashMap<TConsensusGroupId, Boolean> result = new HashMap<TConsensusGroupId, Boolean>();
        if (DataRegionConsensusImpl.getInstance() != null) {
            DataRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), DataRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        if (SchemaRegionConsensusImpl.getInstance() != null) {
            SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().forEach(groupId -> result.put(groupId.convertToTConsensusGroupId(), SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
        }
        return result;
    }

    private long getMemory(String gaugeName) {
        long result = 0L;
        try {
            Gauge gauge;
            List<String> heapIds = Arrays.asList("PS Eden Space", "PS Old Eden", "Ps Survivor Space");
            List<String> noHeapIds = Arrays.asList("Code Cache", "Compressed Class Space", "Metaspace");
            for (String id : heapIds) {
                gauge = MetricsService.getInstance().getMetricManager().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "heap"});
                result += gauge.value();
            }
            for (String id : noHeapIds) {
                gauge = MetricsService.getInstance().getMetricManager().getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, new String[]{"id", id, "area", "noheap"});
                result += gauge.value();
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to get memory from metric because {}", (Object)e.getMessage());
            return 0L;
        }
        return result;
    }

    public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) {
        if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS);
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
    }

    public TSStatus flush(TFlushReq req) throws TException {
        return StorageEngineV2.getInstance().operateFlush(req);
    }

    public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
        ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)tconsensusGroupId);
        if (consensusGroupId instanceof DataRegionId) {
            ConsensusGenericResponse response = DataRegionConsensusImpl.getInstance().removeConsensusGroup(consensusGroupId);
            if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
            }
            StorageEngineV2.getInstance().deleteDataRegion((DataRegionId)consensusGroupId);
        } else {
            ConsensusGenericResponse response = SchemaRegionConsensusImpl.getInstance().removeConsensusGroup(consensusGroupId);
            if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) {
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.DELETE_REGION_ERROR, (String)response.getException().getMessage());
            }
            try {
                SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId)consensusGroupId);
            }
            catch (MetadataException e) {
                LOGGER.error("{}: MetaData error: ", (Object)"IoTDB", (Object)e);
                return RpcUtils.getStatus((TSStatusCode)TSStatusCode.METADATA_ERROR, (String)e.getMessage());
            }
        }
        return RpcUtils.getStatus((TSStatusCode)TSStatusCode.SUCCESS_STATUS, (String)"Execute successfully");
    }

    public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) {
        TSStatus tsStatus;
        ConsensusGenericResponse consensusGenericResponse;
        TRegionReplicaSet regionReplicaSet = req.migrateRegion;
        switch (regionReplicaSet.regionId.type) {
            case DataRegion: {
                DataRegionId dataRegionId = new DataRegionId(regionReplicaSet.getRegionId().getId());
                ArrayList<Peer> newPeers = new ArrayList<Peer>();
                for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                    TEndPoint endpoint = new TEndPoint(dataNodeLocation.getDataRegionConsensusEndPoint().getIp(), dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
                    newPeers.add(new Peer((ConsensusGroupId)dataRegionId, endpoint));
                }
                consensusGenericResponse = DataRegionConsensusImpl.getInstance().changePeer((ConsensusGroupId)dataRegionId, newPeers);
                break;
            }
            case SchemaRegion: {
                SchemaRegionId schemaRegionId = new SchemaRegionId(regionReplicaSet.getRegionId().getId());
                ArrayList<Peer> newPeers = new ArrayList<Peer>();
                for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
                    TEndPoint endpoint = new TEndPoint(dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(), dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
                    newPeers.add(new Peer((ConsensusGroupId)schemaRegionId, endpoint));
                }
                consensusGenericResponse = SchemaRegionConsensusImpl.getInstance().changePeer((ConsensusGroupId)schemaRegionId, newPeers);
                break;
            }
            default: {
                TSStatus tsStatus2 = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
                tsStatus2.setMessage("Region type is invalid");
                return new TMigrateRegionResp(tsStatus2);
            }
        }
        if (consensusGenericResponse.isSuccess()) {
            tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        } else {
            tsStatus = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
            tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
        }
        return new TMigrateRegionResp(tsStatus);
    }

    public TSStatus createFunction(TCreateFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().register(request.getUdfName(), request.getClassName(), request.getUris(), UDFExecutableManager.getInstance(), true);
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus dropFunction(TDropFunctionRequest request) {
        try {
            UDFRegistrationService.getInstance().deregister(request.getUdfName());
            return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
        catch (Exception e) {
            return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public void handleClientExit() {
    }
}

