/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.service;

import java.util.HashMap;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.rescon.AbstractPoolManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RegionMigrateService
implements IService {
    private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
    private static final int RETRY = 5;
    private static final int SLEEP_MILLIS = 5000;
    private RegionMigratePool regionMigratePool;

    private RegionMigrateService() {
    }

    public static RegionMigrateService getInstance() {
        return Holder.INSTANCE;
    }

    public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
        boolean submitSucceed = true;
        try {
            this.regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode()));
        }
        catch (Exception e) {
            LOGGER.error("Submit addRegionPeer task error for Region: {} on DataNode: {}.", new Object[]{req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e});
            submitSucceed = false;
        }
        return submitSucceed;
    }

    public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) {
        boolean submitSucceed = true;
        try {
            this.regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode()));
        }
        catch (Exception e) {
            LOGGER.error("Submit removeRegionPeer task error for Region: {} on DataNode: {}.", new Object[]{req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e});
            submitSucceed = false;
        }
        return submitSucceed;
    }

    public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
        boolean submitSucceed = true;
        try {
            this.regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode()));
        }
        catch (Exception e) {
            LOGGER.error("Submit deleteOldRegionPeerTask error for Region: {} on DataNode: {}.", new Object[]{req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e});
            submitSucceed = false;
        }
        return submitSucceed;
    }

    public void start() throws StartupException {
        this.regionMigratePool = new RegionMigratePool();
        this.regionMigratePool.start();
        LOGGER.info("Region migrate service start");
    }

    public void stop() {
        if (this.regionMigratePool != null) {
            this.regionMigratePool.stop();
        }
        LOGGER.info("Region migrate service stop");
    }

    public ServiceType getID() {
        return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
    }

    private static void reportSucceed(TConsensusGroupId tRegionId) {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        status.setMessage("Region: " + tRegionId + " migrated succeed");
        TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
        try {
            RegionMigrateService.reportRegionMigrateResultToConfigNode(req);
        }
        catch (Throwable e) {
            LOGGER.error("Report region {} migrate successful result error, result:{}", new Object[]{tRegionId, req, e});
        }
    }

    private static void reportFailed(TConsensusGroupId tRegionId, TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
        TRegionMigrateResultReportReq req = RegionMigrateService.createFailedRequest(tRegionId, failedNode, failedType, status);
        try {
            RegionMigrateService.reportRegionMigrateResultToConfigNode(req);
        }
        catch (Throwable e) {
            LOGGER.error("Report region {} migrate failed result error, result:{}", new Object[]{tRegionId, req, e});
        }
    }

    private static TRegionMigrateResultReportReq createFailedRequest(TConsensusGroupId tRegionId, TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
        HashMap<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<TDataNodeLocation, TRegionMigrateFailedType>();
        failedNodeAndReason.put(failedNode, failedType);
        TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
        req.setFailedNodeAndReason(failedNodeAndReason);
        return req;
    }

    private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req) throws TException {
        try (ConfigNodeClient client = new ConfigNodeClient();){
            TSStatus status = client.reportRegionMigrateResult(req);
            LOGGER.info("Report region {} migrate result {} to Config node succeed, result: {}", new Object[]{req.getRegionId(), req, status});
        }
    }

    private static class DeleteOldRegionPeerTask
    implements Runnable {
        private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
        private final TConsensusGroupId tRegionId;
        private final TDataNodeLocation fromNode;

        public DeleteOldRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) {
            this.tRegionId = tRegionId;
            this.fromNode = fromNode;
        }

        @Override
        public void run() {
            TSStatus runResult = this.deleteOldRegionPeer();
            if (this.isFailed(runResult)) {
                RegionMigrateService.reportFailed(this.tRegionId, this.fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
            }
            if (this.isFailed(runResult = this.deleteRegion())) {
                RegionMigrateService.reportFailed(this.tRegionId, this.fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
            }
            RegionMigrateService.reportSucceed(this.tRegionId);
        }

        private TSStatus deleteOldRegionPeer() {
            ConsensusGenericResponse resp;
            ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)this.tRegionId);
            taskLogger.info("Start to deleteOldRegionPeer: {}", (Object)regionId);
            TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            try {
                resp = regionId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().deletePeer(regionId) : SchemaRegionConsensusImpl.getInstance().deletePeer(regionId);
            }
            catch (Throwable e) {
                taskLogger.error("DeleteOldRegionPeer error, regionId: {}", (Object)regionId, (Object)e);
                status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
                status.setMessage("deleteOldRegionPeer for region: " + regionId + " error. exception: " + e.getMessage());
                return status;
            }
            if (!resp.isSuccess()) {
                taskLogger.error("deleteOldRegionPeer error, regionId: {}", (Object)regionId, (Object)resp.getException());
                status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
                status.setMessage(String.format("deleteOldRegionPeer error, regionId: %s, errorMessage: %s", regionId, resp.getException().getMessage()));
                return status;
            }
            taskLogger.info("succeed to remove region {} consensus group", (Object)regionId);
            status.setMessage("remove region consensus group " + regionId + "succeed");
            return status;
        }

        private TSStatus deleteRegion() {
            TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)this.tRegionId);
            taskLogger.debug("start to delete region {}", (Object)regionId);
            try {
                if (regionId instanceof DataRegionId) {
                    StorageEngineV2.getInstance().deleteDataRegion((DataRegionId)regionId);
                } else {
                    SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId)regionId);
                }
            }
            catch (Throwable e) {
                taskLogger.error("delete the region {} failed", (Object)regionId, (Object)e);
                status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
                status.setMessage("delete region " + regionId + "failed, " + e.getMessage());
                return status;
            }
            status.setMessage("delete region " + regionId + " succeed");
            taskLogger.info("Finished to delete region {}", (Object)regionId);
            return status;
        }

        private boolean isSucceed(TSStatus status) {
            return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
        }

        private boolean isFailed(TSStatus status) {
            return !this.isSucceed(status);
        }
    }

    private static class RemoveRegionPeerTask
    implements Runnable {
        private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
        private final TConsensusGroupId tRegionId;
        private final TDataNodeLocation selectedDataNode;

        public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
            this.tRegionId = tRegionId;
            this.selectedDataNode = selectedDataNode;
        }

        @Override
        public void run() {
            TSStatus runResult = this.removePeer();
            if (this.isFailed(runResult)) {
                RegionMigrateService.reportFailed(this.tRegionId, this.selectedDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
            }
            RegionMigrateService.reportSucceed(this.tRegionId);
        }

        private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
            ConsensusGenericResponse resp = regionId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer) : SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
            return resp;
        }

        private TSStatus removePeer() {
            ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)this.tRegionId);
            TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            TEndPoint oldPeerNode = this.getConsensusEndPoint(this.selectedDataNode, regionId);
            taskLogger.info("Start to remove peer {} for region {}", (Object)oldPeerNode, (Object)regionId);
            ConsensusGenericResponse resp = null;
            boolean removePeerSucceed = true;
            for (int i = 0; i < 5; ++i) {
                try {
                    if (!removePeerSucceed) {
                        Thread.sleep(5000L);
                    }
                    resp = this.removeRegionPeer(regionId, new Peer(regionId, this.selectedDataNode.getDataNodeId(), oldPeerNode));
                    removePeerSucceed = true;
                }
                catch (Throwable e) {
                    removePeerSucceed = false;
                    taskLogger.error("remove peer {} for region {} error, retry times: {}", new Object[]{oldPeerNode, regionId, i, e});
                    status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
                    status.setMessage("remove peer: " + oldPeerNode + " for region: " + regionId + " error. exception: " + e.getMessage());
                }
                if (removePeerSucceed && resp != null && resp.isSuccess()) break;
            }
            if (!removePeerSucceed || resp == null || !resp.isSuccess()) {
                taskLogger.error("Remove old peer {} for region {} failed, resp: {}", new Object[]{oldPeerNode, regionId, resp});
                status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
                status.setMessage("remove old peer " + oldPeerNode + " for region " + regionId + " failed");
                return status;
            }
            taskLogger.info("Succeed to remove peer {} for region {}", (Object)oldPeerNode, (Object)regionId);
            status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
            return status;
        }

        private TEndPoint getConsensusEndPoint(TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
            if (regionId instanceof DataRegionId) {
                return nodeLocation.getDataRegionConsensusEndPoint();
            }
            return nodeLocation.getSchemaRegionConsensusEndPoint();
        }

        private boolean isSucceed(TSStatus status) {
            return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
        }

        private boolean isFailed(TSStatus status) {
            return !this.isSucceed(status);
        }
    }

    private static class AddRegionPeerTask
    implements Runnable {
        private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
        private final TConsensusGroupId tRegionId;
        private final TDataNodeLocation selectedDataNode;

        public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
            this.tRegionId = tRegionId;
            this.selectedDataNode = selectedDataNode;
        }

        @Override
        public void run() {
            TSStatus runResult = this.addPeer();
            if (this.isFailed(runResult)) {
                RegionMigrateService.reportFailed(this.tRegionId, this.selectedDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
                return;
            }
            RegionMigrateService.reportSucceed(this.tRegionId);
        }

        private TSStatus addPeer() {
            ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)this.tRegionId);
            TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            ConsensusGenericResponse resp = null;
            TEndPoint newPeerNode = this.getConsensusEndPoint(this.selectedDataNode, regionId);
            taskLogger.info("Start to add peer {} for region {}", (Object)newPeerNode, (Object)this.tRegionId);
            boolean addPeerSucceed = true;
            for (int i = 0; i < 5; ++i) {
                try {
                    if (!addPeerSucceed) {
                        Thread.sleep(5000L);
                    }
                    resp = this.addRegionPeer(regionId, new Peer(regionId, this.selectedDataNode.getDataNodeId(), newPeerNode));
                    addPeerSucceed = true;
                }
                catch (Throwable e) {
                    addPeerSucceed = false;
                    taskLogger.error("Add new peer {} for region {} error, retry times: {}", new Object[]{newPeerNode, regionId, i, e});
                    status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
                    status.setMessage(String.format("Add peer for region error, peerId: %s, regionId: %s, errorMessage: %s", newPeerNode, regionId, e.getMessage()));
                }
                if (addPeerSucceed && resp != null && resp.isSuccess()) break;
            }
            if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
                taskLogger.error("Add new peer {} for region {} failed, resp: {}", new Object[]{newPeerNode, regionId, resp});
                status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
                status.setMessage(String.format("Add peer for region error, peerId: %s, regionId: %s, resp: %s", newPeerNode, regionId, resp));
                return status;
            }
            taskLogger.info("Succeed to add peer {} for region {}", (Object)newPeerNode, (Object)regionId);
            status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
            return status;
        }

        private ConsensusGenericResponse addRegionPeer(ConsensusGroupId regionId, Peer newPeer) {
            ConsensusGenericResponse resp = regionId instanceof DataRegionId ? DataRegionConsensusImpl.getInstance().addPeer(regionId, newPeer) : SchemaRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
            return resp;
        }

        private TEndPoint getConsensusEndPoint(TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
            if (regionId instanceof DataRegionId) {
                return nodeLocation.getDataRegionConsensusEndPoint();
            }
            return nodeLocation.getSchemaRegionConsensusEndPoint();
        }

        private boolean isSucceed(TSStatus status) {
            return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
        }

        private boolean isFailed(TSStatus status) {
            return !this.isSucceed(status);
        }
    }

    private static class RegionMigratePool
    extends AbstractPoolManager {
        private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);

        private RegionMigratePool() {
            this.pool = IoTDBThreadPoolFactory.newSingleThreadExecutor((String)"Region-Migrate-Pool");
        }

        @Override
        public Logger getLogger() {
            return this.poolLogger;
        }

        @Override
        public void start() {
            if (this.pool != null) {
                this.poolLogger.info("Data Node region migrate pool start");
            }
        }

        @Override
        public String getName() {
            return "migrate region";
        }
    }

    private static class Holder {
        private static final RegionMigrateService INSTANCE = new RegionMigrateService();

        private Holder() {
        }
    }
}

