/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.group;

import com.fasterxml.jackson.core.type.TypeReference;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.auth.Authentication;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupOperator;
import org.apache.inlong.manager.service.group.InlongGroupOperatorFactory;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;

@Service
@Validated
public class InlongGroupServiceImpl
implements InlongGroupService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);
    @Value(value="${sort.enable.zookeeper:false}")
    private boolean enableZookeeper;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongGroupExtEntityMapper groupExtMapper;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private StreamSourceEntityMapper streamSourceMapper;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private InlongGroupOperatorFactory groupOperatorFactory;
    @Autowired
    private SourceOperatorFactory sourceOperatorFactory;
    @Autowired
    private UserService userService;

    private static void doUpdateCheck(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
        if (entity == null || request == null) {
            return;
        }
        List<String> inCharges = Arrays.asList(entity.getInCharges().split(","));
        if (!inCharges.contains(operator)) {
            LOGGER.error("user [{}] has no privilege for the inlong group", (Object)operator);
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus curStatus = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curStatus)) {
            String errMsg = String.format("Current status=%s is not allowed to update", curStatus);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
        }
        if (!entity.getMqType().equals(request.getMqType()) && !GroupStatus.allowedUpdateMQ((GroupStatus)curStatus)) {
            String errMsg = String.format("Current status=%s is not allowed to update MQ type", curStatus);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public String save(InlongGroupRequest request, String operator) {
        LOGGER.debug("begin to save inlong group={} by user={}", (Object)request, (Object)operator);
        Preconditions.expectNotNull((Object)request, (String)"inlong group request cannot be empty");
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity != null) {
            LOGGER.error("groupId={} has already exists", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        request.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        groupId = instance.saveOpt(request, operator);
        this.saveOrUpdateExt(groupId, request.getExtList());
        LOGGER.info("success to save inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        return groupId;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public String save(InlongGroupRequest request, UserInfo opInfo) {
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity != null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        request.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        groupId = instance.saveOpt(request, opInfo.getName());
        this.saveOrUpdateExt(groupId, request.getExtList());
        return groupId;
    }

    @Override
    public Boolean exist(String groupId) {
        Preconditions.expectNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        LOGGER.debug("success to check inlong group {}, exist? {}", (Object)groupId, (Object)(entity != null ? 1 : 0));
        return entity != null;
    }

    @Override
    public InlongGroupInfo get(String groupId) {
        Preconditions.expectNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(entity.getMqType());
        InlongGroupInfo groupInfo = instance.getFromEntity(entity);
        List extEntityList = this.groupExtMapper.selectByGroupId(groupId);
        List extList = CommonBeanUtils.copyListProperties((List)extEntityList, InlongGroupExtInfo::new);
        groupInfo.setExtList(extList);
        BaseSortConf sortConf = this.buildSortConfig(extList);
        groupInfo.setSortConf(sortConf);
        LOGGER.debug("success to get inlong group for groupId={}", (Object)groupId);
        return groupInfo;
    }

    @Override
    public InlongGroupInfo get(String groupId, UserInfo opInfo) {
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.userService.checkUser(entity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(entity.getMqType());
        InlongGroupInfo groupInfo = instance.getFromEntity(entity);
        List extEntityList = this.groupExtMapper.selectByGroupId(groupId);
        List extList = CommonBeanUtils.copyListProperties((List)extEntityList, InlongGroupExtInfo::new);
        groupInfo.setExtList(extList);
        BaseSortConf sortConf = this.buildSortConfig(extList);
        groupInfo.setSortConf(sortConf);
        return groupInfo;
    }

    @Override
    public InlongGroupCountResponse countGroupByUser(String operator, Integer lightweight) {
        InlongGroupCountResponse countVO = new InlongGroupCountResponse();
        List statusCount = this.groupMapper.countGroupByUser(operator, lightweight);
        for (Map map : statusCount) {
            int status = (Integer)map.get("status");
            long count = (Long)map.get("count");
            countVO.setTotalCount(countVO.getTotalCount() + count);
            if (status == GroupStatus.CONFIG_ING.getCode()) {
                countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
                continue;
            }
            if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
                countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
                continue;
            }
            if (status != GroupStatus.APPROVE_REJECTED.getCode()) continue;
            countVO.setRejectCount(countVO.getRejectCount() + count);
        }
        LOGGER.debug("success to count inlong group for operator={}", (Object)operator);
        return countVO;
    }

    @Override
    public InlongGroupTopicInfo getTopic(String groupId) {
        InlongGroupInfo groupInfo = this.get(groupId);
        InlongGroupOperator groupOperator = this.groupOperatorFactory.getInstance(groupInfo.getMqType());
        InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
        topicInfo.setInlongGroupId(groupId);
        String clusterTag = groupInfo.getInlongClusterTag();
        topicInfo.setInlongClusterTag(clusterTag);
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
        topicInfo.setClusterInfos(clusterInfos);
        LOGGER.debug("success to get topic for groupId={}, result={}", (Object)groupId, (Object)topicInfo);
        return topicInfo;
    }

    @Override
    public InlongGroupTopicInfo getBackupTopic(String groupId) {
        InlongGroupExtEntity extEntity = this.groupExtMapper.selectByUniqueKey(groupId, "backup_cluster_tag");
        if (extEntity == null || StringUtils.isBlank((CharSequence)extEntity.getKeyValue())) {
            LOGGER.warn("not found any backup topic for groupId={}", (Object)groupId);
            return null;
        }
        InlongGroupInfo groupInfo = this.get(groupId);
        InlongGroupOperator groupOperator = this.groupOperatorFactory.getInstance(groupInfo.getMqType());
        InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo);
        backupTopicInfo.setInlongGroupId(groupId);
        String backupClusterTag = extEntity.getKeyValue();
        backupTopicInfo.setInlongClusterTag(backupClusterTag);
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
        backupTopicInfo.setClusterInfos(clusterInfos);
        LOGGER.debug("success to get backup topic for groupId={}, result={}", (Object)groupId, (Object)backupTopicInfo);
        return backupTopicInfo;
    }

    @Override
    public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) {
        if (request.getPageSize() > PageRequest.MAX_PAGE_SIZE) {
            LOGGER.warn("list inlong groups, change page size from {} to {}", (Object)request.getPageSize(), (Object)PageRequest.MAX_PAGE_SIZE);
            request.setPageSize(PageRequest.MAX_PAGE_SIZE.intValue());
        }
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        Page entityPage = (Page)this.groupMapper.selectByCondition(request);
        List briefInfos = CommonBeanUtils.copyListProperties((List)entityPage, InlongGroupBriefInfo::new);
        if (request.isListSources() && CollectionUtils.isNotEmpty((Collection)briefInfos)) {
            Set groupIds = briefInfos.stream().map(InlongGroupBriefInfo::getInlongGroupId).collect(Collectors.toSet());
            List sourceEntities = this.streamSourceMapper.selectByGroupIds(new ArrayList(groupIds));
            HashMap sourceMap = Maps.newHashMap();
            sourceEntities.forEach(sourceEntity -> {
                StreamSourceOperator operation = this.sourceOperatorFactory.getInstance(sourceEntity.getSourceType());
                StreamSource source = operation.getFromEntity((StreamSourceEntity)sourceEntity);
                sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList()).add(source);
            });
            briefInfos.forEach(group -> {
                List sources = sourceMap.getOrDefault(group.getInlongGroupId(), Lists.newArrayList());
                group.setStreamSources(sources);
            });
        }
        PageResult pageResult = new PageResult(briefInfos, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.getPageSize()));
        LOGGER.debug("success to list inlong group for {}", (Object)request);
        return pageResult;
    }

    @Override
    public List<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request, UserInfo opInfo) {
        ArrayList<InlongGroupEntity> filterGroupEntities = new ArrayList<InlongGroupEntity>();
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        for (InlongGroupEntity groupEntity : this.groupMapper.selectByCondition(request)) {
            List<String> inCharges;
            if (!opInfo.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) && !(inCharges = Arrays.asList(groupEntity.getInCharges().split(","))).contains(opInfo.getName())) continue;
            filterGroupEntities.add(groupEntity);
        }
        List briefInfos = CommonBeanUtils.copyListProperties(filterGroupEntities, InlongGroupBriefInfo::new);
        if (request.isListSources() && CollectionUtils.isNotEmpty((Collection)briefInfos)) {
            Set groupIds = briefInfos.stream().map(InlongGroupBriefInfo::getInlongGroupId).collect(Collectors.toSet());
            List sourceEntities = this.streamSourceMapper.selectByGroupIds(new ArrayList(groupIds));
            HashMap sourceMap = Maps.newHashMap();
            sourceEntities.forEach(sourceEntity -> {
                StreamSourceOperator operation = this.sourceOperatorFactory.getInstance(sourceEntity.getSourceType());
                StreamSource source = operation.getFromEntity((StreamSourceEntity)sourceEntity);
                sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList()).add(source);
            });
            briefInfos.forEach(group -> {
                List sources = sourceMap.getOrDefault(group.getInlongGroupId(), Lists.newArrayList());
                group.setStreamSources(sources);
            });
        }
        return briefInfos;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest request, String operator) {
        LOGGER.debug("begin to update inlong group={} by user={}", (Object)request, (Object)operator);
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.chkUnmodifiableParams(entity, request);
        InlongGroupServiceImpl.doUpdateCheck(entity, request, operator);
        request.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        instance.updateOpt(request, operator);
        this.saveOrUpdateExt(groupId, request.getExtList());
        LOGGER.info("success to update inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        return groupId;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest request, UserInfo opInfo) {
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.chkUnmodifiableParams(entity, request);
        this.userService.checkUser(entity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus curStatus = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curStatus)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, String.format("Current status=%s is not allowed to update", curStatus));
        }
        if (!entity.getMqType().equals(request.getMqType()) && !GroupStatus.allowedUpdateMQ((GroupStatus)curStatus)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, String.format("Current status=%s is not allowed to update MQ type", curStatus));
        }
        request.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        instance.updateOpt(request, opInfo.getName());
        this.saveOrUpdateExt(groupId, request.getExtList());
        return groupId;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public Boolean updateStatus(String groupId, Integer status, String operator) {
        GroupStatus nextState;
        LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", new Object[]{status, groupId, operator});
        Preconditions.expectNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupIdForUpdate(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus curState = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedTransition((GroupStatus)curState, (GroupStatus)(nextState = GroupStatus.forCode((int)status)))) {
            String errorMsg = String.format("Current status=%s is not allowed to transfer to state=%s", curState, nextState);
            LOGGER.error(errorMsg);
            throw new BusinessException(errorMsg);
        }
        this.groupMapper.updateStatus(groupId, status, operator);
        LOGGER.info("success to update group status to [{}] for groupId={} by user={}", new Object[]{status, groupId, operator});
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW)
    public void updateAfterApprove(InlongGroupApproveRequest approveRequest, String operator) {
        LOGGER.debug("begin to update inlong group after approve={}", (Object)approveRequest);
        String groupId = approveRequest.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            throw new BusinessException("inlong group not found with group id=" + groupId);
        }
        if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
            throw new BusinessException("inlong group status [wait_approval] not allowed to approve again");
        }
        if (StringUtils.isNotBlank((CharSequence)approveRequest.getInlongClusterTag())) {
            entity.setInlongGroupId(groupId);
            entity.setInlongClusterTag(approveRequest.getInlongClusterTag());
            entity.setStatus(GroupStatus.APPROVE_PASSED.getCode());
            if (approveRequest.getDataReportType() != null && !Objects.equals(approveRequest.getDataReportType(), entity.getDataReportType())) {
                entity.setDataReportType(approveRequest.getDataReportType());
            }
            entity.setModifier(operator);
            int rowCount = this.groupMapper.updateByIdentifierSelective(entity);
            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                LOGGER.error("inlong group has already updated with group id={}, curVersion={}", (Object)groupId, (Object)entity.getVersion());
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        } else {
            this.updateStatus(groupId, GroupStatus.APPROVE_PASSED.getCode(), operator);
        }
        LOGGER.info("success to update inlong group status after approve for groupId={}", (Object)groupId);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> exts) {
        if (CollectionUtils.isEmpty(exts)) {
            return;
        }
        List entityList = CommonBeanUtils.copyListProperties(exts, InlongGroupExtEntity::new);
        for (InlongGroupExtEntity entity : entityList) {
            entity.setInlongGroupId(groupId);
        }
        this.groupExtMapper.insertOnDuplicateKeyUpdate(entityList);
    }

    @Override
    public List<InlongGroupTopicInfo> listTopics(InlongGroupTopicRequest request) {
        LOGGER.info("start to list group topic infos, request={}", (Object)request);
        Preconditions.expectNotEmpty((String)request.getClusterTag(), (String)"cluster tag should not be empty");
        List groupEntities = this.groupMapper.selectByTopicRequest(request);
        ArrayList<InlongGroupTopicInfo> topicInfos = new ArrayList<InlongGroupTopicInfo>();
        for (InlongGroupEntity entity : groupEntities) {
            topicInfos.add(this.getTopic(entity.getInlongGroupId()));
        }
        LOGGER.info("success list group topic infos under clusterTag={}, size={}", (Object)request.getClusterTag(), (Object)topicInfos.size());
        return topicInfos;
    }

    @Override
    public InlongGroupInfo doDeleteCheck(String groupId, String operator) {
        int count;
        InlongGroupInfo groupInfo = this.get(groupId);
        List<String> inCharges = Arrays.asList(groupInfo.getInCharges().split(","));
        if (!inCharges.contains(operator)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED, String.format("user [%s] has no privilege for the inlong group", operator));
        }
        GroupStatus curState = GroupStatus.forCode((int)groupInfo.getStatus());
        if (GroupStatus.notAllowedTransition((GroupStatus)curState, (GroupStatus)GroupStatus.DELETING)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, String.format("current group status=%s was not allowed to delete", curState));
        }
        if (GroupStatus.deleteStreamFirst((GroupStatus)curState) && (count = this.streamService.selectCountByGroupId(groupId)) >= 1) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_HAS_STREAM, String.format("groupId=%s have [%s] inlong streams, deleted failed", groupId, count));
        }
        return groupInfo;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(String groupId, String operator) {
        LOGGER.info("begin to delete inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
        if (GroupStatus.allowedDeleteSubInfos((GroupStatus)GroupStatus.forCode((int)entity.getStatus()))) {
            this.streamService.logicDeleteAll(groupId, operator);
        }
        entity.setIsDeleted(entity.getId());
        entity.setStatus(GroupStatus.DELETED.getCode());
        entity.setModifier(operator);
        int rowCount = this.groupMapper.updateByIdentifierSelective(entity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("inlong group has already updated for groupId={} curVersion={}", (Object)groupId, (Object)entity.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.groupExtMapper.logicDeleteAllByGroupId(groupId);
        LOGGER.info("success to delete group and group ext property for groupId={} by user={}", (Object)groupId, (Object)operator);
        return true;
    }

    private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
        HashMap<String, String> extMap = new HashMap<String, String>();
        extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
        String type = (String)extMap.get("sort.type");
        if (StringUtils.isBlank((CharSequence)type)) {
            return null;
        }
        BaseSortConf.SortType sortType = BaseSortConf.SortType.forType((String)type);
        switch (sortType) {
            case FLINK: {
                return this.createFlinkSortConfig(extMap);
            }
            case USER_DEFINED: {
                return this.createUserDefinedSortConfig(extMap);
            }
        }
        LOGGER.warn("unsupported sort config for sortType: {}", (Object)sortType);
        return null;
    }

    private FlinkSortConf createFlinkSortConfig(Map<String, String> extMap) {
        FlinkSortConf sortConf = new FlinkSortConf();
        sortConf.setServiceUrl(extMap.get("sort.url"));
        String properties = extMap.get("sort.properties");
        if (StringUtils.isNotBlank((CharSequence)properties)) {
            sortConf.setProperties((Map)JsonUtils.parseObject((String)properties, (TypeReference)new TypeReference<Map<String, String>>(){}));
        } else {
            sortConf.setProperties((Map)Maps.newHashMap());
        }
        String authenticationType = extMap.get("sort.authentication.type");
        if (StringUtils.isNotBlank((CharSequence)authenticationType)) {
            Authentication.AuthType authType = Authentication.AuthType.forType((String)authenticationType);
            Preconditions.expectTrue((authType == Authentication.AuthType.SECRET_AND_TOKEN ? 1 : 0) != 0, (String)"Only support SECRET_AND_TOKEN for flink sort auth");
            String authentication = extMap.get("sort.authentication");
            Map authProperties = (Map)JsonUtils.parseObject((String)authentication, (TypeReference)new TypeReference<Map<String, String>>(){});
            SecretTokenAuthentication secretTokenAuthentication = new SecretTokenAuthentication();
            secretTokenAuthentication.configure(authProperties);
            sortConf.setAuthentication((Authentication)secretTokenAuthentication);
        }
        return sortConf;
    }

    private UserDefinedSortConf createUserDefinedSortConfig(Map<String, String> extMap) {
        UserDefinedSortConf sortConf = new UserDefinedSortConf();
        String sortName = extMap.get("sort.name");
        sortConf.setSortName(sortName);
        String properties = extMap.get("sort.properties");
        if (StringUtils.isNotBlank((CharSequence)properties)) {
            sortConf.setProperties((Map)JsonUtils.parseObject((String)properties, (TypeReference)new TypeReference<Map<String, String>>(){}));
        } else {
            sortConf.setProperties((Map)Maps.newHashMap());
        }
        return sortConf;
    }

    private void chkUnmodifiableParams(InlongGroupEntity entity, InlongGroupRequest request) {
        Preconditions.expectEquals((Object)entity.getMqType(), (Object)request.getMqType(), (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"mqType not allowed modify");
        Preconditions.expectEquals((Object)entity.getVersion(), (Object)request.getVersion(), (ErrorCodeEnum)ErrorCodeEnum.CONFIG_EXPIRED, (String)String.format("record has expired with record version=%d, request version=%d", entity.getVersion(), request.getVersion()));
    }
}

