/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.cluster;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class InlongClusterServiceImpl
implements InlongClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final Gson GSON = new Gson();
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongClusterOperatorFactory clusterOperatorFactory;
    @Autowired
    private InlongClusterEntityMapper clusterMapper;
    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;
    @Autowired
    private DataProxyConfigRepository proxyRepository;

    @Override
    public Integer save(InlongClusterRequest request, String operator) {
        LOGGER.debug("begin to save inlong cluster={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster info cannot be empty");
        String clusterTag = request.getClusterTag();
        String name = request.getName();
        String type = request.getType();
        List exist = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isNotEmpty((Collection)exist)) {
            String errMsg = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTag, name, type);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(request.getType());
        Integer id = instance.saveOpt(request, operator);
        LOGGER.info("success to save inlong cluster={} by user={}", (Object)request, (Object)operator);
        return id;
    }

    @Override
    public InlongClusterInfo get(Integer id) {
        Preconditions.checkNotNull((Object)id, (String)"inlong cluster id cannot be empty");
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
        InlongClusterInfo clusterInfo = instance.getFromEntity(entity);
        LOGGER.debug("success to get inlong cluster info by id={}", (Object)id);
        return clusterInfo;
    }

    @Override
    public PageInfo<InlongClusterInfo> list(InlongClusterPageRequest request) {
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.clusterMapper.selectByCondition(request);
        List list = entityPage.stream().map(entity -> {
            InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
            return instance.getFromEntity((InlongClusterEntity)entity);
        }).collect(Collectors.toList());
        PageInfo page = new PageInfo(list);
        page.setTotal((long)list.size());
        LOGGER.debug("success to list inlong cluster by {}", (Object)request);
        return page;
    }

    @Override
    public InlongClusterInfo getOne(String clusterTag, String name, String type) {
        List entityList = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s", clusterTag, name, type));
        }
        InlongClusterEntity entity = (InlongClusterEntity)entityList.get(0);
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(entity.getType());
        InlongClusterInfo result = instance.getFromEntity(entity);
        LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", new Object[]{clusterTag, name, type});
        return result;
    }

    @Override
    public Boolean update(InlongClusterRequest request, String operator) {
        LOGGER.debug("begin to update inlong cluster={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster info cannot be empty");
        Integer id = request.getId();
        Preconditions.checkNotNull((Object)id, (String)"inlong cluster id cannot be empty");
        String clusterTag = request.getClusterTag();
        String name = request.getName();
        String type = request.getType();
        List exist = this.clusterMapper.selectByKey(clusterTag, name, type);
        if (CollectionUtils.isNotEmpty((Collection)exist) && !Objects.equals(id, ((InlongClusterEntity)exist.get(0)).getId())) {
            String errMsg = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTag, name, type);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        InlongClusterOperator instance = this.clusterOperatorFactory.getInstance(request.getType());
        instance.updateOpt(request, operator);
        LOGGER.info("success to update inlong cluster={}", (Object)request);
        return true;
    }

    @Override
    public Boolean delete(Integer id, String operator) {
        Preconditions.checkNotNull((Object)id, (String)"cluster id cannot be empty");
        InlongClusterEntity entity = this.clusterMapper.selectById(id);
        if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
            LOGGER.error("inlong cluster not found by id={}, or was already deleted", (Object)id);
            return false;
        }
        entity.setIsDeleted(entity.getId());
        entity.setModifier(operator);
        this.clusterMapper.updateById(entity);
        LOGGER.info("success to delete inlong cluster for id={} by user={}", (Object)id, (Object)operator);
        return true;
    }

    @Override
    public Integer saveNode(ClusterNodeRequest request, String operator) {
        LOGGER.debug("begin to insert inlong cluster node={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"cluster node info cannot be empty");
        InlongClusterNodeEntity exist = this.clusterNodeMapper.selectByUniqueKey(request);
        if (exist != null) {
            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s", request.getType(), request.getIp(), request.getPort());
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterNodeEntity entity = (InlongClusterNodeEntity)CommonBeanUtils.copyProperties((Object)request, InlongClusterNodeEntity::new);
        entity.setCreator(operator);
        entity.setCreateTime(new Date());
        entity.setIsDeleted(GlobalConstants.UN_DELETED);
        this.clusterNodeMapper.insert(entity);
        LOGGER.info("success to add inlong cluster node={}", (Object)request);
        return entity.getId();
    }

    @Override
    public ClusterNodeResponse getNode(Integer id) {
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("inlong cluster node not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)CommonBeanUtils.copyProperties((Object)entity, ClusterNodeResponse::new);
        LOGGER.debug("success to get inlong cluster node by id={}", (Object)id);
        return clusterNodeResponse;
    }

    @Override
    public PageInfo<ClusterNodeResponse> listNode(InlongClusterPageRequest request) {
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.clusterNodeMapper.selectByCondition(request);
        List nodeList = CommonBeanUtils.copyListProperties((List)entityPage, ClusterNodeResponse::new);
        PageInfo page = new PageInfo(nodeList);
        page.setTotal((long)nodeList.size());
        LOGGER.debug("success to list inlong cluster node by {}", (Object)request);
        return page;
    }

    @Override
    public List<String> listNodeIpByType(String type) {
        Preconditions.checkNotNull((Object)type, (String)"cluster type cannot be empty");
        InlongClusterPageRequest request = new InlongClusterPageRequest();
        request.setType(type);
        List nodeList = this.clusterNodeMapper.selectByCondition(request);
        if (CollectionUtils.isEmpty((Collection)nodeList)) {
            LOGGER.debug("not found any node for type={}", (Object)type);
            return Collections.emptyList();
        }
        List<String> ipList = nodeList.stream().map(node -> String.format("%s:%d", node.getIp(), node.getPort())).collect(Collectors.toList());
        LOGGER.debug("success to list node by type={}, result={}", (Object)type, ipList);
        return ipList;
    }

    @Override
    public Boolean updateNode(ClusterNodeRequest request, String operator) {
        LOGGER.debug("begin to update inlong cluster node={}", (Object)request);
        Preconditions.checkNotNull((Object)request, (String)"inlong cluster node cannot be empty");
        Integer id = request.getId();
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity exist = this.clusterNodeMapper.selectByUniqueKey(request);
        if (exist != null && !Objects.equals(id, exist.getId())) {
            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s)", request.getType(), request.getIp(), request.getPort());
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null) {
            LOGGER.error("cluster node not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        CommonBeanUtils.copyProperties((Object)request, (Object)entity, (boolean)true);
        entity.setParentId(request.getParentId());
        entity.setModifier(operator);
        this.clusterNodeMapper.updateById(entity);
        LOGGER.info("success to update inlong cluster node={}", (Object)request);
        return true;
    }

    @Override
    public Boolean deleteNode(Integer id, String operator) {
        Preconditions.checkNotNull((Object)id, (String)"cluster node id cannot be empty");
        InlongClusterNodeEntity entity = this.clusterNodeMapper.selectById(id);
        if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
            LOGGER.error("inlong cluster node not found by id={}", (Object)id);
            return false;
        }
        entity.setIsDeleted(entity.getId());
        entity.setModifier(operator);
        this.clusterNodeMapper.updateById(entity);
        LOGGER.info("success to delete inlong cluster node by id={}", (Object)id);
        return true;
    }

    @Override
    public List<DataProxyNodeInfo> getDataProxyNodeList(String clusterTag, String clusterName) {
        LOGGER.debug("begin to list data proxy node for tag={} name={}", (Object)clusterTag, (Object)clusterName);
        InlongClusterPageRequest request = new InlongClusterPageRequest();
        request.setClusterTag(clusterTag);
        request.setName(clusterName);
        request.setType("DATA_PROXY");
        List clusterList = this.clusterMapper.selectByCondition(request);
        Preconditions.checkNotEmpty((Collection)clusterList, (String)("data proxy node not found by tag=" + clusterTag + " name=" + clusterName));
        ArrayList<DataProxyNodeInfo> responseList = new ArrayList<DataProxyNodeInfo>();
        for (InlongClusterEntity cluster : clusterList) {
            Integer clusterId = cluster.getId();
            List nodeList = this.clusterNodeMapper.selectByParentId(clusterId);
            for (InlongClusterNodeEntity nodeEntity : nodeList) {
                DataProxyNodeInfo response = new DataProxyNodeInfo();
                response.setId(nodeEntity.getId());
                response.setParentId(clusterId);
                response.setIp(nodeEntity.getIp());
                response.setPort(nodeEntity.getPort());
                responseList.add(response);
            }
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to list data proxy node for tag={} name={}, result={}", new Object[]{clusterTag, clusterName, responseList});
        }
        return responseList;
    }

    @Override
    public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName) {
        LOGGER.debug("GetDPConfig: begin to get config by cluster tag={} name={}", (Object)clusterTag, (Object)clusterName);
        InlongClusterPageRequest request = InlongClusterPageRequest.builder().clusterTag(clusterTag).name(clusterName).type("DATA_PROXY").build();
        List clusterList = this.clusterMapper.selectByCondition(request);
        DataProxyConfig result = new DataProxyConfig();
        if (CollectionUtils.isEmpty((Collection)clusterList)) {
            LOGGER.warn("GetDPConfig: data proxy cluster not found by tag={} name={}", (Object)clusterTag, (Object)clusterName);
            return result;
        }
        List clusterTagList = clusterList.stream().map(InlongClusterEntity::getClusterTag).collect(Collectors.toList());
        InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder().status(GroupStatus.CONFIG_SUCCESSFUL.getCode()).clusterTagList(clusterTagList).build();
        List groupList = this.groupMapper.selectBriefList(groupRequest);
        if (CollectionUtils.isEmpty((Collection)groupList)) {
            LOGGER.warn("GetDPConfig: no inlong group found with success status by cluster tags={}", clusterTagList);
            return result;
        }
        LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}", clusterTagList, (Object)groupList.size());
        ArrayList<DataProxyTopicInfo> topicList = new ArrayList<DataProxyTopicInfo>();
        for (InlongGroupBriefInfo groupInfo : groupList) {
            String groupId = groupInfo.getInlongGroupId();
            String mqResource = groupInfo.getMqResource();
            String realClusterTag = groupInfo.getInlongClusterTag();
            MQType type = MQType.forType((String)groupInfo.getMqType());
            if (type == MQType.PULSAR || type == MQType.TDMQ_PULSAR) {
                List streamList = this.streamMapper.selectBriefList(groupId);
                for (InlongStreamBriefInfo streamInfo : streamList) {
                    List pulsarClusters = this.clusterMapper.selectByKey(realClusterTag, null, "PULSAR");
                    if (CollectionUtils.isEmpty((Collection)pulsarClusters)) {
                        LOGGER.error("GetDPConfig: pulsar cluster not found by cluster tag={}", (Object)realClusterTag);
                        continue;
                    }
                    InlongClusterEntity cluster = (InlongClusterEntity)pulsarClusters.get(0);
                    PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson((String)cluster.getExtParams());
                    String tenant = pulsarCluster.getTenant();
                    if (StringUtils.isBlank((CharSequence)tenant)) {
                        tenant = "public";
                    }
                    String streamId = streamInfo.getInlongStreamId();
                    String topic = String.format("persistent://%s/%s/%s", tenant, mqResource, streamInfo.getMqResource());
                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
                    topicConfig.setTopic(topic);
                    topicList.add(topicConfig);
                }
                continue;
            }
            if (type != MQType.TUBE) continue;
            DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
            topicConfig.setInlongGroupId(groupId);
            topicConfig.setTopic(mqResource);
            topicList.add(topicConfig);
        }
        LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", clusterTagList);
        ArrayList<MQClusterInfo> mqSet = new ArrayList<MQClusterInfo>();
        List<String> typeList = Arrays.asList("TUBE", "PULSAR");
        InlongClusterPageRequest pageRequest = InlongClusterPageRequest.builder().typeList(typeList).clusterTagList(clusterTagList).build();
        List mqClusterList = this.clusterMapper.selectByCondition(pageRequest);
        for (InlongClusterEntity cluster : mqClusterList) {
            MQClusterInfo clusterInfo = new MQClusterInfo();
            clusterInfo.setUrl(cluster.getUrl());
            clusterInfo.setToken(cluster.getToken());
            Map configParams = (Map)GSON.fromJson(cluster.getExtParams(), Map.class);
            clusterInfo.setParams(configParams);
            mqSet.add(clusterInfo);
        }
        result.setMqClusterList(mqSet);
        result.setTopicList(topicList);
        return result;
    }

    @Override
    public String getAllConfig(String clusterName, String md5) {
        DataProxyConfigResponse response = new DataProxyConfigResponse();
        String configMd5 = this.proxyRepository.getProxyMd5(clusterName);
        if (configMd5 == null) {
            response.setResult(Boolean.valueOf(false));
            response.setErrCode(Integer.valueOf(-101));
            return GSON.toJson((Object)response);
        }
        if (configMd5.equals(md5)) {
            response.setResult(Boolean.valueOf(true));
            response.setErrCode(Integer.valueOf(1));
            response.setMd5(configMd5);
            response.setData(new DataProxyCluster());
            return GSON.toJson((Object)response);
        }
        String configJson = this.proxyRepository.getProxyConfigJson(clusterName);
        if (configJson == null) {
            response.setResult(Boolean.valueOf(false));
            response.setErrCode(Integer.valueOf(-101));
            return GSON.toJson((Object)response);
        }
        return configJson;
    }
}

