/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.cluster;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.UserEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class InlongClusterServiceImpl
implements InlongClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final Gson GSON = new Gson();
    @Autowired
    private UserEntityMapper userMapper;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongClusterOperatorFactory clusterOperatorFactory;
    @Autowired
    private InlongClusterTagEntityMapper clusterTagMapper;
    @Autowired
    private InlongClusterEntityMapper clusterMapper;
    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;
    @Lazy
    @Autowired
    private DataProxyConfigRepository proxyRepository;

    @Override
    public Integer saveTag(ClusterTagRequest request, String operator) {
        LOGGER.debug("begin to save cluster tag {}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster request cannot be empty");
        Preconditions.checkNotNull((Object)request.getClusterTag(), (String)"cluster tag cannot be empty");
        String clusterTag = request.getClusterTag();
        InlongClusterTagEntity exist = this.clusterTagMapper.selectByTag(clusterTag);
        if (exist != null) {
            String errMsg = String.format("inlong cluster tag [%s] already exist", clusterTag);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterTagEntity entity = (InlongClusterTagEntity)CommonBeanUtils.copyProperties((Object)request, InlongClusterTagEntity::new);
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.clusterTagMapper.insert(entity);
        LOGGER.info("success to save cluster tag={} by user={}", (Object)request, (Object)operator);
        return entity.getId();
    }

    @Override
    public ClusterTagResponse getTag(Integer id, String currentUser) {
        Preconditions.checkNotNull((Object)id, (String)"inlong cluster tag id cannot be empty");
        InlongClusterTagEntity entity = this.clusterTagMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster tag not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        UserEntity userEntity = this.userMapper.selectByName(currentUser);
        boolean isInCharge = Preconditions.inSeparatedString((String)currentUser, (String)entity.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) ? 1 : 0) != 0, (String)"Current user does not have permission to get cluster tag");
        ClusterTagResponse response = (ClusterTagResponse)CommonBeanUtils.copyProperties((Object)entity, ClusterTagResponse::new);
        LOGGER.debug("success to get cluster tag info by id={}", (Object)id);
        return response;
    }

    @Override
    public PageResult<ClusterTagResponse> listTag(ClusterTagPageRequest request) {
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.clusterTagMapper.selectByCondition(request);
        List tagList = CommonBeanUtils.copyListProperties((List)entityPage, ClusterTagResponse::new);
        PageResult pageResult = new PageResult(tagList, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list cluster tag by {}", (Object)request);
        return pageResult;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public Boolean updateTag(ClusterTagRequest request, String operator) {
        LOGGER.debug("begin to update cluster tag={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster request cannot be empty");
        String newClusterTag = request.getClusterTag();
        Preconditions.checkNotNull((Object)newClusterTag, (String)"inlong cluster tag cannot be empty");
        Integer id = request.getId();
        Preconditions.checkNotNull((Object)id, (String)"cluster tag id cannot be empty");
        InlongClusterTagEntity exist = this.clusterTagMapper.selectById(id);
        if (exist == null) {
            LOGGER.warn("inlong cluster tag was not exist for id={}", (Object)id);
            return true;
        }
        String errMsg = String.format("cluster tag has already updated with name=%s, curVersion=%s", exist.getClusterTag(), request.getVersion());
        if (!Objects.equals(exist.getVersion(), request.getVersion())) {
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        UserEntity userEntity = this.userMapper.selectByName(operator);
        boolean isInCharge = Preconditions.inSeparatedString((String)operator, (String)exist.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) ? 1 : 0) != 0, (String)"Current user does not have permission to update cluster tag");
        String oldClusterTag = exist.getClusterTag();
        if (!newClusterTag.equals(oldClusterTag)) {
            InlongClusterTagEntity tagConflict = this.clusterTagMapper.selectByTag(newClusterTag);
            if (tagConflict != null) {
                String tagErrMsg = String.format("inlong cluster tag [%s] already exist", newClusterTag);
                LOGGER.error(tagErrMsg);
                throw new BusinessException(tagErrMsg);
            }
            this.assertNoInlongGroupExists(oldClusterTag);
            List clusterEntities = this.clusterMapper.selectByKey(oldClusterTag, null, null);
            if (CollectionUtils.isNotEmpty((Collection)clusterEntities)) {
                clusterEntities.forEach(entity -> {
                    HashSet tagSet = Sets.newHashSet((Object[])entity.getClusterTags().split(","));
                    tagSet.remove(oldClusterTag);
                    tagSet.add(newClusterTag);
                    String updateTags = Joiner.on((String)",").join((Iterable)tagSet);
                    entity.setClusterTags(updateTags);
                    entity.setModifier(operator);
                    if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(entity)) {
                        LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{entity.getName(), entity.getType(), entity.getVersion()});
                        throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                    }
                });
            }
        }
        CommonBeanUtils.copyProperties((Object)request, (Object)exist, (boolean)true);
        exist.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterTagMapper.updateById(exist)) {
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to update cluster tag={}", (Object)request);
        return true;
    }

    @Override
    public Boolean deleteTag(Integer id, String operator) {
        Preconditions.checkNotNull((Object)id, (String)"cluster tag id cannot be empty");
        InlongClusterTagEntity exist = this.clusterTagMapper.selectById(id);
        if (exist == null || exist.getIsDeleted() > InlongConstants.UN_DELETED) {
            LOGGER.error("inlong cluster tag not found by id={}", (Object)id);
            return false;
        }
        UserEntity userEntity = this.userMapper.selectByName(operator);
        boolean isInCharge = Preconditions.inSeparatedString((String)operator, (String)exist.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) ? 1 : 0) != 0, (String)"Current user does not have permission to delete cluster tag");
        String clusterTag = exist.getClusterTag();
        this.assertNoInlongGroupExists(clusterTag);
        List clusterEntities = this.clusterMapper.selectByKey(clusterTag, null, null);
        if (CollectionUtils.isNotEmpty((Collection)clusterEntities)) {
            clusterEntities.forEach(entity -> this.removeClusterTag((InlongClusterEntity)entity, clusterTag, operator));
        }
        exist.setIsDeleted(exist.getId());
        exist.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterTagMapper.updateById(exist)) {
            LOGGER.error("cluster tag has already updated with name={}, curVersion={}", (Object)exist.getClusterTag(), (Object)exist.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete cluster tag by id={}", (Object)id);
        return true;
    }

    @Override
    public Integer save(ClusterRequest request, String operator) {
        LOGGER.debug("begin to save inlong cluster={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster request cannot be empty");
        String clusterTag = request.getClusterTags();
        String name = request.getName();
        String type = request.getType();
        List exist = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isNotEmpty((Collection)exist)) {
            String errMsg = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTag, name, type);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(request.getType());
        Integer id = instance.saveOpt(request, operator);
        LOGGER.info("success to save inlong cluster={} by user={}", (Object)request, (Object)operator);
        return id;
    }

    @Override
    public ClusterInfo get(Integer id, String currentUser) {
        Preconditions.checkNotNull((Object)id, (String)"inlong cluster id cannot be empty");
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        String message = "Current user does not have permission to get cluster info";
        this.checkUser(entity, currentUser, message);
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
        ClusterInfo clusterInfo = instance.getFromEntity(entity);
        LOGGER.debug("success to get inlong cluster info by id={}", (Object)id);
        return clusterInfo;
    }

    @Override
    public PageResult<ClusterInfo> list(ClusterPageRequest request) {
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.clusterMapper.selectByCondition(request);
        List list = entityPage.stream().map(entity -> {
            InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
            return instance.getFromEntity((InlongClusterEntity)entity);
        }).collect(Collectors.toList());
        PageResult pageResult = new PageResult(list, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list inlong cluster by {}", (Object)request);
        return pageResult;
    }

    @Override
    public List<ClusterInfo> listByTagAndType(String clusterTag, String clusterType) {
        List clusterEntities = this.clusterMapper.selectByKey(clusterTag, null, clusterType);
        if (CollectionUtils.isEmpty((Collection)clusterEntities)) {
            throw new BusinessException(String.format("cannot find any cluster by tag %s and type %s", clusterTag, clusterType));
        }
        List<ClusterInfo> clusterInfos = clusterEntities.stream().map(entity -> {
            InlongClusterOperator operator = this.clusterOperatorFactory.getInstance(entity.getType());
            return operator.getFromEntity((InlongClusterEntity)entity);
        }).collect(Collectors.toList());
        LOGGER.debug("success to list inlong cluster by tag={}", (Object)clusterTag);
        return clusterInfos;
    }

    @Override
    public ClusterInfo getOne(String clusterTag, String name, String type) {
        List entityList = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s", clusterTag, name, type));
        }
        InlongClusterEntity entity = (InlongClusterEntity)entityList.get(0);
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
        ClusterInfo result = instance.getFromEntity(entity);
        LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", new Object[]{clusterTag, name, type});
        return result;
    }

    @Override
    public Boolean update(ClusterRequest request, String operator) {
        LOGGER.debug("begin to update inlong cluster: {}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster info cannot be empty");
        Integer id = request.getId();
        Preconditions.checkNotNull((Object)id, (String)"inlong cluster id cannot be empty");
        String clusterTag = request.getClusterTags();
        String name = request.getName();
        String type = request.getType();
        List exist = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isNotEmpty((Collection)exist) && !Objects.equals(id, ((InlongClusterEntity)exist.get(0)).getId())) {
            String errMsg = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTag, name, type);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{request.getName(), request.getType(), request.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        String message = "Current user does not have permission to update cluster info";
        this.checkUser(entity, operator, message);
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(request.getType());
        instance.updateOpt(request, operator);
        LOGGER.info("success to update inlong cluster: {} by {}", (Object)request, (Object)operator);
        return true;
    }

    @Override
    public UpdateResult updateByKey(ClusterRequest request, String operator) {
        LOGGER.debug("begin to update inlong cluster: {}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster info cannot be null");
        String name = request.getName();
        String type = request.getType();
        Preconditions.checkNotEmpty((String)name, (String)"inlong cluster name cannot be empty");
        Preconditions.checkNotEmpty((String)type, (String)"inlong cluster type cannot be empty");
        InlongClusterEntity entity = this.clusterMapper.selectByNameAndType(name, type);
        if (entity == null) {
            LOGGER.error("inlong cluster not found by name={}, type={}", (Object)name, (Object)type);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
            String errMsg = String.format("cluster has already updated with name=%s, type=%s, curVersion=%s", request.getName(), request.getType(), request.getVersion());
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        request.setId(entity.getId());
        String message = "Current user does not have permission to update cluster info";
        this.checkUser(entity, operator, message);
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(request.getType());
        instance.updateOpt(request, operator);
        LOGGER.info("success to update inlong cluster: {} by {}", (Object)request, (Object)operator);
        return new UpdateResult(entity.getId(), Boolean.valueOf(true), Integer.valueOf(request.getVersion() + 1));
    }

    @Override
    public Boolean bindTag(BindTagRequest request, String operator) {
        LOGGER.info("begin to bind or unbind cluster tag: {}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster info cannot be empty");
        String clusterTag = request.getClusterTag();
        Preconditions.checkNotNull((Object)clusterTag, (String)"cluster tag cannot be empty");
        InlongClusterTagEntity exist = this.clusterTagMapper.selectByTag(clusterTag);
        UserEntity userEntity = this.userMapper.selectByName(operator);
        boolean isInCharge = Preconditions.inSeparatedString((String)operator, (String)exist.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) ? 1 : 0) != 0, (String)"Current user does not have permission to bind or unbind cluster tag");
        if (CollectionUtils.isNotEmpty((Collection)request.getBindClusters())) {
            request.getBindClusters().forEach(id -> {
                InlongClusterEntity entity = this.clusterMapper.selectById(id);
                HashSet tagSet = Sets.newHashSet((Object[])entity.getClusterTags().split(","));
                tagSet.add(clusterTag);
                String updateTags = Joiner.on((String)",").join((Iterable)tagSet);
                InlongClusterEntity updateEntity = this.clusterMapper.selectById(id);
                updateEntity.setClusterTags(updateTags);
                updateEntity.setModifier(operator);
                if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(updateEntity)) {
                    LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{updateEntity.getName(), updateEntity.getType(), updateEntity.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
            });
        }
        if (CollectionUtils.isNotEmpty((Collection)request.getUnbindClusters())) {
            request.getUnbindClusters().forEach(id -> {
                InlongClusterEntity entity = this.clusterMapper.selectById(id);
                this.removeClusterTag(entity, clusterTag, operator);
            });
        }
        LOGGER.info("success to bind or unbind cluster tag {} by {}", (Object)request, (Object)operator);
        return true;
    }

    @Override
    public Boolean deleteByKey(String name, String type, String operator) {
        Preconditions.checkNotNull((Object)name, (String)"cluster name should not be empty or null");
        Preconditions.checkNotNull((Object)name, (String)"cluster type should not be empty or null");
        InlongClusterEntity entity = this.clusterMapper.selectByNameAndType(name, type);
        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
            LOGGER.error("inlong cluster not found by clusterName={}, type={} or was already deleted", (Object)name, (Object)type);
            return false;
        }
        UserEntity userEntity = this.userMapper.selectByName(operator);
        boolean isInCharge = Preconditions.inSeparatedString((String)operator, (String)entity.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) ? 1 : 0) != 0, (String)"Current user does not have permission to delete cluster info");
        List nodeEntities = this.clusterNodeMapper.selectByParentId(entity.getId(), null);
        if (CollectionUtils.isNotEmpty((Collection)nodeEntities)) {
            String errMsg = String.format("there are undeleted nodes under the cluster [%s], please delete the node first", entity.getName());
            throw new BusinessException(errMsg);
        }
        entity.setIsDeleted(entity.getId());
        entity.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(entity)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{entity.getName(), entity.getType(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster for clusterName={}, type={} by user={}", new Object[]{name, type, operator});
        return true;
    }

    @Override
    public Boolean delete(Integer id, String operator) {
        Preconditions.checkNotNull((Object)id, (String)"cluster id cannot be empty");
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
            LOGGER.error("inlong cluster not found by id={}, or was already deleted", (Object)id);
            return false;
        }
        String message = "Current user does not have permission to delete cluster info";
        this.checkUser(entity, operator, message);
        List nodeEntities = this.clusterNodeMapper.selectByParentId(id, null);
        if (CollectionUtils.isNotEmpty((Collection)nodeEntities)) {
            String errMsg = String.format("there are undeleted nodes under the cluster [%s], please delete the node first", entity.getName());
            throw new BusinessException(errMsg);
        }
        entity.setIsDeleted(entity.getId());
        entity.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(entity)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{entity.getName(), entity.getType(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster for id={} by user={}", (Object)id, (Object)operator);
        return true;
    }

    @Override
    public Integer saveNode(ClusterNodeRequest request, String operator) {
        LOGGER.debug("begin to insert inlong cluster node={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"cluster node info cannot be empty");
        InlongClusterNodeEntity exist = this.clusterNodeMapper.selectByUniqueKey(request);
        if (exist != null) {
            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s", request.getType(), request.getIp(), request.getPort());
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterNodeEntity entity = (InlongClusterNodeEntity)CommonBeanUtils.copyProperties((Object)request, InlongClusterNodeEntity::new);
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.clusterNodeMapper.insert(entity);
        LOGGER.info("success to add inlong cluster node={}", (Object)request);
        return entity.getId();
    }

    @Override
    public ClusterNodeResponse getNode(Integer id, String currentUser) {
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster node not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        InlongClusterEntity cluster = this.clusterMapper.selectById(entity.getParentId());
        String message = "Current user does not have permission to get cluster node";
        this.checkUser(cluster, currentUser, message);
        ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)CommonBeanUtils.copyProperties((Object)entity, ClusterNodeResponse::new);
        LOGGER.debug("success to get inlong cluster node by id={}", (Object)id);
        return clusterNodeResponse;
    }

    @Override
    public PageResult<ClusterNodeResponse> listNode(ClusterPageRequest request, String currentUser) {
        if (StringUtils.isNotBlank((CharSequence)request.getClusterTag())) {
            List<ClusterNodeResponse> nodeList = this.listNodeByClusterTag(request);
            return new PageResult(nodeList, Long.valueOf(nodeList.size()));
        }
        Integer parentId = request.getParentId();
        Preconditions.checkNotNull((Object)parentId, (String)"Cluster id cannot be empty");
        InlongClusterEntity cluster = this.clusterMapper.selectById(parentId);
        String message = "Current user does not have permission to get cluster node list";
        this.checkUser(cluster, currentUser, message);
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.clusterNodeMapper.selectByCondition(request);
        List nodeList = CommonBeanUtils.copyListProperties((List)entityPage, ClusterNodeResponse::new);
        PageResult pageResult = new PageResult(nodeList, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list inlong cluster node by {}", (Object)request);
        return pageResult;
    }

    @Override
    public List<ClusterNodeResponse> listNodeByGroupId(String groupId, String clusterType, String protocolType) {
        LOGGER.debug("begin to get cluster nodes for groupId={}, clusterType={}, protocol={}", new Object[]{groupId, clusterType, protocolType});
        List<InlongClusterNodeEntity> nodeEntities = this.getClusterNodes(groupId, clusterType, protocolType);
        if (CollectionUtils.isEmpty(nodeEntities)) {
            LOGGER.debug("not any cluster node for groupId={}, clusterType={}, protocol={}", new Object[]{groupId, clusterType, protocolType});
            return Collections.emptyList();
        }
        List result = CommonBeanUtils.copyListProperties(nodeEntities, ClusterNodeResponse::new);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get nodes for groupId={}, clusterType={}, protocol={}, result size={}", new Object[]{groupId, clusterType, protocolType, result});
        }
        return result;
    }

    public List<ClusterNodeResponse> listNodeByClusterTag(ClusterPageRequest request) {
        List clusterList = this.clusterMapper.selectByKey(request.getClusterTag(), request.getName(), request.getType());
        ArrayList allNodeList = new ArrayList();
        for (InlongClusterEntity cluster : clusterList) {
            List nodeList = this.clusterNodeMapper.selectByParentId(cluster.getId(), null);
            allNodeList.addAll(nodeList);
        }
        return CommonBeanUtils.copyListProperties(allNodeList, ClusterNodeResponse::new);
    }

    @Override
    public List<String> listNodeIpByType(String type) {
        Preconditions.checkNotNull((Object)type, (String)"cluster type cannot be empty");
        ClusterPageRequest request = new ClusterPageRequest();
        request.setType(type);
        List nodeList = this.clusterNodeMapper.selectByCondition(request);
        if (CollectionUtils.isEmpty((Collection)nodeList)) {
            LOGGER.debug("not found any node for type={}", (Object)type);
            return Collections.emptyList();
        }
        List<String> ipList = nodeList.stream().map(node -> String.format("%s:%d", node.getIp(), node.getPort())).collect(Collectors.toList());
        LOGGER.debug("success to list node by type={}, result={}", (Object)type, ipList);
        return ipList;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public Boolean updateNode(ClusterNodeRequest request, String operator) {
        LOGGER.debug("begin to update inlong cluster node={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster node cannot be empty");
        Integer id = request.getId();
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity exist = this.clusterNodeMapper.selectByUniqueKey(request);
        if (exist != null && !Objects.equals(id, exist.getId())) {
            String errMsg = "inlong cluster node already exist for " + request;
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("cluster node not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        String errMsg = "cluster node has already updated for " + request;
        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
            LOGGER.warn(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        InlongClusterEntity cluster = this.clusterMapper.selectById(entity.getParentId());
        String message = "Current user does not have permission to update cluster node";
        this.checkUser(cluster, operator, message);
        CommonBeanUtils.copyProperties((Object)request, (Object)entity, (boolean)true);
        entity.setParentId(request.getParentId());
        entity.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterNodeMapper.updateById(entity)) {
            LOGGER.warn(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to update inlong cluster node={}", (Object)request);
        return true;
    }

    @Override
    public Boolean deleteNode(Integer id, String operator) {
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
            LOGGER.error("inlong cluster node not found by id={}", (Object)id);
            return false;
        }
        InlongClusterEntity cluster = this.clusterMapper.selectById(entity.getParentId());
        String message = "Current user does not have permission to delete cluster node";
        this.checkUser(cluster, operator, message);
        entity.setIsDeleted(entity.getId());
        entity.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterNodeMapper.updateById(entity)) {
            LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}, protocolType={}", new Object[]{entity.getParentId(), entity.getType(), entity.getIp(), entity.getPort(), entity.getProtocolType()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster node by id={}", (Object)id);
        return true;
    }

    @Override
    public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolType) {
        LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", (Object)groupId, (Object)protocolType);
        List<InlongClusterNodeEntity> nodeEntities = this.getClusterNodes(groupId, "DATAPROXY", protocolType);
        DataProxyNodeResponse response = new DataProxyNodeResponse();
        if (CollectionUtils.isEmpty(nodeEntities)) {
            LOGGER.debug("not any data proxy node for groupId={}, protocol={}", (Object)groupId, (Object)protocolType);
            return response;
        }
        response.setClusterId(nodeEntities.get(0).getParentId());
        ArrayList<DataProxyNodeInfo> nodeList = new ArrayList<DataProxyNodeInfo>();
        for (InlongClusterNodeEntity nodeEntity : nodeEntities) {
            DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
            nodeInfo.setId(nodeEntity.getId());
            nodeInfo.setIp(nodeEntity.getIp());
            nodeInfo.setPort(nodeEntity.getPort());
            nodeInfo.setProtocolType(nodeEntity.getProtocolType());
            nodeList.add(nodeInfo);
        }
        response.setNodeList(nodeList);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get dp nodes for groupId={}, protocol={}, result={}", new Object[]{groupId, protocolType, response});
        }
        return response;
    }

    private List<InlongClusterNodeEntity> getClusterNodes(String groupId, String clusterType, String protocolType) {
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(groupId);
        if (groupEntity == null) {
            String msg = "inlong group not exists for groupId=" + groupId;
            LOGGER.debug(msg);
            throw new BusinessException(msg);
        }
        String clusterTag = groupEntity.getInlongClusterTag();
        if (StringUtils.isBlank((CharSequence)clusterTag)) {
            String msg = "not found any cluster tag for groupId=" + groupId;
            LOGGER.debug(msg);
            throw new BusinessException(msg);
        }
        List clusterList = this.clusterMapper.selectByKey(clusterTag, null, clusterType);
        if (CollectionUtils.isEmpty((Collection)clusterList)) {
            String msg = "not found any data proxy cluster for groupId=" + groupId + " and clusterTag=" + clusterTag;
            LOGGER.debug(msg);
            throw new BusinessException(msg);
        }
        return this.clusterNodeMapper.selectByParentId(((InlongClusterEntity)clusterList.get(0)).getId(), protocolType);
    }

    @Override
    public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName) {
        LOGGER.debug("GetDPConfig: begin to get config by cluster tag={} name={}", (Object)clusterTag, (Object)clusterName);
        ClusterPageRequest request = ClusterPageRequest.builder().clusterTag(clusterTag).name(clusterName).type("DATAPROXY").build();
        List clusterEntityList = this.clusterMapper.selectByCondition(request);
        DataProxyConfig result = new DataProxyConfig();
        if (CollectionUtils.isEmpty((Collection)clusterEntityList)) {
            LOGGER.warn("GetDPConfig: not found data proxy cluster by tag={} name={}", (Object)clusterTag, (Object)clusterName);
            return result;
        }
        HashSet tagSet = new HashSet(16);
        clusterEntityList.forEach(e -> tagSet.addAll(Arrays.asList(e.getClusterTags().split(","))));
        ArrayList clusterTagList = new ArrayList(tagSet);
        InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder().status(GroupStatus.CONFIG_SUCCESSFUL.getCode()).clusterTagList(clusterTagList).build();
        List groupList = this.groupMapper.selectBriefList(groupRequest);
        if (CollectionUtils.isEmpty((Collection)groupList)) {
            LOGGER.warn("GetDPConfig: not found inlong group with success status by cluster tags={}", clusterTagList);
            return result;
        }
        LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}", clusterTagList, (Object)groupList.size());
        ArrayList<DataProxyTopicInfo> topicList = new ArrayList<DataProxyTopicInfo>();
        for (InlongGroupBriefInfo groupInfo : groupList) {
            String groupId = groupInfo.getInlongGroupId();
            String mqResource = groupInfo.getMqResource();
            String realClusterTag = groupInfo.getInlongClusterTag();
            String mqType = groupInfo.getMqType();
            if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
                List streamList = this.streamMapper.selectBriefList(groupId);
                for (InlongStreamBriefInfo streamInfo : streamList) {
                    List pulsarClusters = this.clusterMapper.selectByKey(realClusterTag, null, "PULSAR");
                    if (CollectionUtils.isEmpty((Collection)pulsarClusters)) {
                        LOGGER.error("GetDPConfig: not found pulsar cluster by cluster tag={}", (Object)realClusterTag);
                        continue;
                    }
                    InlongClusterEntity cluster = (InlongClusterEntity)pulsarClusters.get(0);
                    PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson((String)cluster.getExtParams());
                    String tenant = pulsarCluster.getTenant();
                    if (StringUtils.isBlank((CharSequence)tenant)) {
                        tenant = "public";
                    }
                    String streamId = streamInfo.getInlongStreamId();
                    String topic = String.format("persistent://%s/%s/%s", tenant, mqResource, streamInfo.getMqResource());
                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
                    topicConfig.setTopic(topic);
                    topicList.add(topicConfig);
                }
                continue;
            }
            if (!"TUBEMQ".equals(mqType)) continue;
            DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
            topicConfig.setInlongGroupId(groupId);
            topicConfig.setTopic(mqResource);
            topicList.add(topicConfig);
        }
        LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", clusterTagList);
        ArrayList<MQClusterInfo> mqSet = new ArrayList<MQClusterInfo>();
        List<String> typeList = Arrays.asList("TUBEMQ", "PULSAR");
        ClusterPageRequest pageRequest = ClusterPageRequest.builder().typeList(typeList).clusterTagList(clusterTagList).build();
        List mqClusterList = this.clusterMapper.selectByCondition(pageRequest);
        for (InlongClusterEntity cluster : mqClusterList) {
            MQClusterInfo clusterInfo = new MQClusterInfo();
            clusterInfo.setUrl(cluster.getUrl());
            clusterInfo.setToken(cluster.getToken());
            clusterInfo.setMqType(cluster.getType());
            Map configParams = (Map)GSON.fromJson(cluster.getExtParams(), Map.class);
            clusterInfo.setParams(configParams);
            mqSet.add(clusterInfo);
        }
        result.setMqClusterList(mqSet);
        result.setTopicList(topicList);
        return result;
    }

    @Override
    public String getAllConfig(String clusterName, String md5) {
        DataProxyConfigResponse response = new DataProxyConfigResponse();
        String configMd5 = this.proxyRepository.getProxyMd5(clusterName);
        if (configMd5 == null) {
            response.setResult(Boolean.valueOf(false));
            response.setErrCode(Integer.valueOf(-101));
            return GSON.toJson((Object)response);
        }
        if (configMd5.equals(md5)) {
            response.setResult(Boolean.valueOf(true));
            response.setErrCode(Integer.valueOf(1));
            response.setMd5(configMd5);
            response.setData(new DataProxyCluster());
            return GSON.toJson((Object)response);
        }
        String configJson = this.proxyRepository.getProxyConfigJson(clusterName);
        if (configJson == null) {
            response.setResult(Boolean.valueOf(false));
            response.setErrCode(Integer.valueOf(-101));
            return GSON.toJson((Object)response);
        }
        return configJson;
    }

    private void removeClusterTag(InlongClusterEntity entity, String clusterTag, String operator) {
        HashSet tagSet = Sets.newHashSet((Object[])entity.getClusterTags().split(","));
        tagSet.remove(clusterTag);
        String updateTags = Joiner.on((String)",").join((Iterable)tagSet);
        entity.setClusterTags(updateTags);
        entity.setModifier(operator);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(entity)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{entity.getName(), entity.getType(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    private void assertNoInlongGroupExists(String clusterTag) {
        List groupEntities = this.groupMapper.selectByClusterTag(clusterTag);
        if (CollectionUtils.isEmpty((Collection)groupEntities)) {
            return;
        }
        List groupIds = groupEntities.stream().map(InlongGroupEntity::getInlongGroupId).collect(Collectors.toList());
        String errMsg = String.format("inlong cluster tag [%s] was used by inlong group %s", clusterTag, groupIds);
        LOGGER.error(errMsg);
        throw new BusinessException(errMsg + ", please delete them first");
    }

    private void checkUser(InlongClusterEntity cluster, String user, String errMsg) {
        UserEntity userEntity = this.userMapper.selectByName(user);
        boolean isInCharge = Preconditions.inSeparatedString((String)user, (String)cluster.getInCharges(), (String)",");
        Preconditions.checkTrue((isInCharge || UserTypeEnum.ADMIN.getCode().equals(userEntity.getAccountType()) ? 1 : 0) != 0, (String)errMsg);
    }
}

