package org.apache.inlong.manager.service.group;

import com.fasterxml.jackson.core.type.TypeReference;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.auth.Authentication;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;

@Service
@Validated
/* loaded from: input_file:org/apache/inlong/manager/service/group/InlongGroupServiceImpl.class */
public class InlongGroupServiceImpl implements InlongGroupService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);

    @Value("${sort.enable.zookeeper:false}")
    private boolean enableZookeeper;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private InlongGroupExtEntityMapper groupExtMapper;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private StreamSourceEntityMapper streamSourceMapper;

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private InlongGroupOperatorFactory groupOperatorFactory;

    @Autowired
    private SourceOperatorFactory sourceOperatorFactory;

    @Autowired
    private UserService userService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.inlong.manager.service.group.InlongGroupServiceImpl$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/manager/service/group/InlongGroupServiceImpl$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$manager$pojo$sort$BaseSortConf$SortType = new int[BaseSortConf.SortType.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$manager$pojo$sort$BaseSortConf$SortType[BaseSortConf.SortType.FLINK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$pojo$sort$BaseSortConf$SortType[BaseSortConf.SortType.USER_DEFINED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private static void doUpdateCheck(InlongGroupEntity inlongGroupEntity, InlongGroupRequest inlongGroupRequest, String str) {
        if (inlongGroupEntity == null || inlongGroupRequest == null) {
            return;
        }
        if (!Arrays.asList(inlongGroupEntity.getInCharges().split(",")).contains(str)) {
            LOGGER.error("user [{}] has no privilege for the inlong group", str);
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus forCode = GroupStatus.forCode(inlongGroupEntity.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            String format = String.format("Current status=%s is not allowed to update", forCode);
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, format);
        }
        if (inlongGroupEntity.getMqType().equals(inlongGroupRequest.getMqType()) || GroupStatus.allowedUpdateMQ(forCode)) {
            return;
        }
        String format2 = String.format("Current status=%s is not allowed to update MQ type", forCode);
        LOGGER.error(format2);
        throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, format2);
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class})
    public String save(InlongGroupRequest inlongGroupRequest, String str) {
        LOGGER.debug("begin to save inlong group={} by user={}", inlongGroupRequest, str);
        Preconditions.expectNotNull(inlongGroupRequest, "inlong group request cannot be empty");
        String inlongGroupId = inlongGroupRequest.getInlongGroupId();
        if (this.groupMapper.selectByGroupId(inlongGroupId) != null) {
            LOGGER.error("groupId={} has already exists", inlongGroupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        inlongGroupRequest.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        String saveOpt = this.groupOperatorFactory.getInstance(inlongGroupRequest.getMqType()).saveOpt(inlongGroupRequest, str);
        saveOrUpdateExt(saveOpt, inlongGroupRequest.getExtList());
        LOGGER.info("success to save inlong group for groupId={} by user={}", saveOpt, str);
        return saveOpt;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class})
    public String save(InlongGroupRequest inlongGroupRequest, UserInfo userInfo) {
        if (this.groupMapper.selectByGroupId(inlongGroupRequest.getInlongGroupId()) != null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        inlongGroupRequest.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        String saveOpt = this.groupOperatorFactory.getInstance(inlongGroupRequest.getMqType()).saveOpt(inlongGroupRequest, userInfo.getName());
        saveOrUpdateExt(saveOpt, inlongGroupRequest.getExtList());
        return saveOpt;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public Boolean exist(String str) {
        Preconditions.expectNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        LOGGER.debug("success to check inlong group {}, exist? {}", str, Boolean.valueOf(selectByGroupId != null));
        return Boolean.valueOf(selectByGroupId != null);
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupInfo get(String str) {
        Preconditions.expectNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        if (selectByGroupId == null) {
            LOGGER.error("inlong group not found by groupId={}", str);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongGroupInfo mo30getFromEntity = this.groupOperatorFactory.getInstance(selectByGroupId.getMqType()).mo30getFromEntity(selectByGroupId);
        List<InlongGroupExtInfo> copyListProperties = CommonBeanUtils.copyListProperties(this.groupExtMapper.selectByGroupId(str), InlongGroupExtInfo::new);
        mo30getFromEntity.setExtList(copyListProperties);
        mo30getFromEntity.setSortConf(buildSortConfig(copyListProperties));
        LOGGER.debug("success to get inlong group for groupId={}", str);
        return mo30getFromEntity;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupInfo get(String str, UserInfo userInfo) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.userService.checkUser(selectByGroupId.getInCharges(), userInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        InlongGroupInfo mo30getFromEntity = this.groupOperatorFactory.getInstance(selectByGroupId.getMqType()).mo30getFromEntity(selectByGroupId);
        List<InlongGroupExtInfo> copyListProperties = CommonBeanUtils.copyListProperties(this.groupExtMapper.selectByGroupId(str), InlongGroupExtInfo::new);
        mo30getFromEntity.setExtList(copyListProperties);
        mo30getFromEntity.setSortConf(buildSortConfig(copyListProperties));
        return mo30getFromEntity;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupCountResponse countGroupByUser(String str, Integer num) {
        InlongGroupCountResponse inlongGroupCountResponse = new InlongGroupCountResponse();
        for (Map map : this.groupMapper.countGroupByUser(str, num)) {
            int intValue = ((Integer) map.get("status")).intValue();
            long longValue = ((Long) map.get("count")).longValue();
            inlongGroupCountResponse.setTotalCount(inlongGroupCountResponse.getTotalCount() + longValue);
            if (intValue == GroupStatus.CONFIG_ING.getCode().intValue()) {
                inlongGroupCountResponse.setWaitAssignCount(inlongGroupCountResponse.getWaitAssignCount() + longValue);
            } else if (intValue == GroupStatus.TO_BE_APPROVAL.getCode().intValue()) {
                inlongGroupCountResponse.setWaitApproveCount(inlongGroupCountResponse.getWaitApproveCount() + longValue);
            } else if (intValue == GroupStatus.APPROVE_REJECTED.getCode().intValue()) {
                inlongGroupCountResponse.setRejectCount(inlongGroupCountResponse.getRejectCount() + longValue);
            }
        }
        LOGGER.debug("success to count inlong group for operator={}", str);
        return inlongGroupCountResponse;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupTopicInfo getTopic(String str) {
        InlongGroupInfo inlongGroupInfo = get(str);
        InlongGroupTopicInfo topic = this.groupOperatorFactory.getInstance(inlongGroupInfo.getMqType()).getTopic(inlongGroupInfo);
        topic.setInlongGroupId(str);
        String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
        topic.setInlongClusterTag(inlongClusterTag);
        topic.setClusterInfos(this.clusterService.listByTagAndType(inlongClusterTag, inlongGroupInfo.getMqType()));
        LOGGER.debug("success to get topic for groupId={}, result={}", str, topic);
        return topic;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupTopicInfo getBackupTopic(String str) {
        InlongGroupExtEntity selectByUniqueKey = this.groupExtMapper.selectByUniqueKey(str, DataProxyConfigRepository.KEY_BACKUP_CLUSTER_TAG);
        if (selectByUniqueKey == null || StringUtils.isBlank(selectByUniqueKey.getKeyValue())) {
            LOGGER.warn("not found any backup topic for groupId={}", str);
            return null;
        }
        InlongGroupInfo inlongGroupInfo = get(str);
        InlongGroupTopicInfo backupTopic = this.groupOperatorFactory.getInstance(inlongGroupInfo.getMqType()).getBackupTopic(inlongGroupInfo);
        backupTopic.setInlongGroupId(str);
        String keyValue = selectByUniqueKey.getKeyValue();
        backupTopic.setInlongClusterTag(keyValue);
        backupTopic.setClusterInfos(this.clusterService.listByTagAndType(keyValue, inlongGroupInfo.getMqType()));
        LOGGER.debug("success to get backup topic for groupId={}, result={}", str, backupTopic);
        return backupTopic;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest inlongGroupPageRequest) {
        if (inlongGroupPageRequest.getPageSize() > PageRequest.MAX_PAGE_SIZE.intValue()) {
            LOGGER.warn("list inlong groups, change page size from {} to {}", Integer.valueOf(inlongGroupPageRequest.getPageSize()), PageRequest.MAX_PAGE_SIZE);
            inlongGroupPageRequest.setPageSize(PageRequest.MAX_PAGE_SIZE.intValue());
        }
        PageHelper.startPage(inlongGroupPageRequest.getPageNum(), inlongGroupPageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(inlongGroupPageRequest);
        OrderTypeEnum.checkOrderType(inlongGroupPageRequest);
        Page selectByCondition = this.groupMapper.selectByCondition(inlongGroupPageRequest);
        List copyListProperties = CommonBeanUtils.copyListProperties(selectByCondition, InlongGroupBriefInfo::new);
        if (inlongGroupPageRequest.isListSources() && CollectionUtils.isNotEmpty(copyListProperties)) {
            List selectByGroupIds = this.streamSourceMapper.selectByGroupIds(new ArrayList((Set) copyListProperties.stream().map((v0) -> {
                return v0.getInlongGroupId();
            }).collect(Collectors.toSet())));
            HashMap newHashMap = Maps.newHashMap();
            selectByGroupIds.forEach(streamSourceEntity -> {
                ((List) newHashMap.computeIfAbsent(streamSourceEntity.getInlongGroupId(), str -> {
                    return Lists.newArrayList();
                })).add(this.sourceOperatorFactory.getInstance(streamSourceEntity.getSourceType()).getFromEntity(streamSourceEntity));
            });
            copyListProperties.forEach(inlongGroupBriefInfo -> {
                inlongGroupBriefInfo.setStreamSources((List) newHashMap.getOrDefault(inlongGroupBriefInfo.getInlongGroupId(), Lists.newArrayList()));
            });
        }
        PageResult<InlongGroupBriefInfo> pageResult = new PageResult<>(copyListProperties, Long.valueOf(selectByCondition.getTotal()), Integer.valueOf(selectByCondition.getPageNum()), Integer.valueOf(selectByCondition.getPageSize()));
        LOGGER.debug("success to list inlong group for {}", inlongGroupPageRequest);
        return pageResult;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public List<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest inlongGroupPageRequest, UserInfo userInfo) {
        ArrayList arrayList = new ArrayList();
        OrderFieldEnum.checkOrderField(inlongGroupPageRequest);
        OrderTypeEnum.checkOrderType(inlongGroupPageRequest);
        for (InlongGroupEntity inlongGroupEntity : this.groupMapper.selectByCondition(inlongGroupPageRequest)) {
            if (userInfo.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) || Arrays.asList(inlongGroupEntity.getInCharges().split(",")).contains(userInfo.getName())) {
                arrayList.add(inlongGroupEntity);
            }
        }
        List<InlongGroupBriefInfo> copyListProperties = CommonBeanUtils.copyListProperties(arrayList, InlongGroupBriefInfo::new);
        if (inlongGroupPageRequest.isListSources() && CollectionUtils.isNotEmpty(copyListProperties)) {
            List selectByGroupIds = this.streamSourceMapper.selectByGroupIds(new ArrayList((Set) copyListProperties.stream().map((v0) -> {
                return v0.getInlongGroupId();
            }).collect(Collectors.toSet())));
            HashMap newHashMap = Maps.newHashMap();
            selectByGroupIds.forEach(streamSourceEntity -> {
                ((List) newHashMap.computeIfAbsent(streamSourceEntity.getInlongGroupId(), str -> {
                    return Lists.newArrayList();
                })).add(this.sourceOperatorFactory.getInstance(streamSourceEntity.getSourceType()).getFromEntity(streamSourceEntity));
            });
            copyListProperties.forEach(inlongGroupBriefInfo -> {
                inlongGroupBriefInfo.setStreamSources((List) newHashMap.getOrDefault(inlongGroupBriefInfo.getInlongGroupId(), Lists.newArrayList()));
            });
        }
        return copyListProperties;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest inlongGroupRequest, String str) {
        LOGGER.debug("begin to update inlong group={} by user={}", inlongGroupRequest, str);
        String inlongGroupId = inlongGroupRequest.getInlongGroupId();
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        if (selectByGroupId == null) {
            LOGGER.error("inlong group not found by groupId={}", inlongGroupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        chkUnmodifiableParams(selectByGroupId, inlongGroupRequest);
        doUpdateCheck(selectByGroupId, inlongGroupRequest, str);
        inlongGroupRequest.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        this.groupOperatorFactory.getInstance(inlongGroupRequest.getMqType()).updateOpt(inlongGroupRequest, str);
        saveOrUpdateExt(inlongGroupId, inlongGroupRequest.getExtList());
        LOGGER.info("success to update inlong group for groupId={} by user={}", inlongGroupId, str);
        return inlongGroupId;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest inlongGroupRequest, UserInfo userInfo) {
        String inlongGroupId = inlongGroupRequest.getInlongGroupId();
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        chkUnmodifiableParams(selectByGroupId, inlongGroupRequest);
        this.userService.checkUser(selectByGroupId.getInCharges(), userInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, String.format("Current status=%s is not allowed to update", forCode));
        }
        if (!selectByGroupId.getMqType().equals(inlongGroupRequest.getMqType()) && !GroupStatus.allowedUpdateMQ(forCode)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, String.format("Current status=%s is not allowed to update MQ type", forCode));
        }
        inlongGroupRequest.setEnableZookeeper(this.enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK);
        this.groupOperatorFactory.getInstance(inlongGroupRequest.getMqType()).updateOpt(inlongGroupRequest, userInfo.getName());
        saveOrUpdateExt(inlongGroupId, inlongGroupRequest.getExtList());
        return inlongGroupId;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public Boolean updateStatus(String str, Integer num, String str2) {
        LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", new Object[]{num, str, str2});
        Preconditions.expectNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity selectByGroupIdForUpdate = this.groupMapper.selectByGroupIdForUpdate(str);
        if (selectByGroupIdForUpdate == null) {
            LOGGER.error("inlong group not found by groupId={}", str);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupIdForUpdate.getStatus().intValue());
        GroupStatus forCode2 = GroupStatus.forCode(num.intValue());
        if (GroupStatus.notAllowedTransition(forCode, forCode2)) {
            String format = String.format("Current status=%s is not allowed to transfer to state=%s", forCode, forCode2);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        this.groupMapper.updateStatus(str, num, str2);
        LOGGER.info("success to update group status to [{}] for groupId={} by user={}", new Object[]{num, str, str2});
        return true;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW)
    public void updateAfterApprove(InlongGroupApproveRequest inlongGroupApproveRequest, String str) {
        LOGGER.debug("begin to update inlong group after approve={}", inlongGroupApproveRequest);
        String inlongGroupId = inlongGroupApproveRequest.getInlongGroupId();
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        if (selectByGroupId == null) {
            throw new BusinessException("inlong group not found with group id=" + inlongGroupId);
        }
        if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), selectByGroupId.getStatus())) {
            throw new BusinessException("inlong group status [wait_approval] not allowed to approve again");
        }
        if (StringUtils.isNotBlank(inlongGroupApproveRequest.getInlongClusterTag())) {
            selectByGroupId.setInlongGroupId(inlongGroupId);
            selectByGroupId.setInlongClusterTag(inlongGroupApproveRequest.getInlongClusterTag());
            selectByGroupId.setStatus(GroupStatus.APPROVE_PASSED.getCode());
            if (inlongGroupApproveRequest.getDataReportType() != null && !Objects.equals(inlongGroupApproveRequest.getDataReportType(), selectByGroupId.getDataReportType())) {
                selectByGroupId.setDataReportType(inlongGroupApproveRequest.getDataReportType());
            }
            selectByGroupId.setModifier(str);
            if (this.groupMapper.updateByIdentifierSelective(selectByGroupId) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                LOGGER.error("inlong group has already updated with group id={}, curVersion={}", inlongGroupId, selectByGroupId.getVersion());
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        } else {
            updateStatus(inlongGroupId, GroupStatus.APPROVE_PASSED.getCode(), str);
        }
        LOGGER.info("success to update inlong group status after approve for groupId={}", inlongGroupId);
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class})
    public void saveOrUpdateExt(String str, List<InlongGroupExtInfo> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        List copyListProperties = CommonBeanUtils.copyListProperties(list, InlongGroupExtEntity::new);
        Iterator it = copyListProperties.iterator();
        while (it.hasNext()) {
            ((InlongGroupExtEntity) it.next()).setInlongGroupId(str);
        }
        this.groupExtMapper.insertOnDuplicateKeyUpdate(copyListProperties);
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public List<InlongGroupTopicInfo> listTopics(InlongGroupTopicRequest inlongGroupTopicRequest) {
        LOGGER.info("start to list group topic infos, request={}", inlongGroupTopicRequest);
        Preconditions.expectNotEmpty(inlongGroupTopicRequest.getClusterTag(), "cluster tag should not be empty");
        List selectByTopicRequest = this.groupMapper.selectByTopicRequest(inlongGroupTopicRequest);
        ArrayList arrayList = new ArrayList();
        Iterator it = selectByTopicRequest.iterator();
        while (it.hasNext()) {
            arrayList.add(getTopic(((InlongGroupEntity) it.next()).getInlongGroupId()));
        }
        LOGGER.info("success list group topic infos under clusterTag={}, size={}", inlongGroupTopicRequest.getClusterTag(), Integer.valueOf(arrayList.size()));
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    public InlongGroupInfo doDeleteCheck(String str, String str2) {
        int selectCountByGroupId;
        InlongGroupInfo inlongGroupInfo = get(str);
        if (!Arrays.asList(inlongGroupInfo.getInCharges().split(",")).contains(str2)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED, String.format("user [%s] has no privilege for the inlong group", str2));
        }
        GroupStatus forCode = GroupStatus.forCode(inlongGroupInfo.getStatus().intValue());
        if (GroupStatus.notAllowedTransition(forCode, GroupStatus.DELETING)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, String.format("current group status=%s was not allowed to delete", forCode));
        }
        if (!GroupStatus.deleteStreamFirst(forCode) || (selectCountByGroupId = this.streamService.selectCountByGroupId(str)) < 1) {
            return inlongGroupInfo;
        }
        throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_HAS_STREAM, String.format("groupId=%s have [%s] inlong streams, deleted failed", str, Integer.valueOf(selectCountByGroupId)));
    }

    @Override // org.apache.inlong.manager.service.group.InlongGroupService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(String str, String str2) {
        LOGGER.info("begin to delete inlong group for groupId={} by user={}", str, str2);
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        Preconditions.expectNotNull(selectByGroupId, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
        if (GroupStatus.allowedDeleteSubInfos(GroupStatus.forCode(selectByGroupId.getStatus().intValue()))) {
            this.streamService.logicDeleteAll(str, str2);
        }
        selectByGroupId.setIsDeleted(selectByGroupId.getId());
        selectByGroupId.setStatus(GroupStatus.DELETED.getCode());
        selectByGroupId.setModifier(str2);
        if (this.groupMapper.updateByIdentifierSelective(selectByGroupId) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("inlong group has already updated for groupId={} curVersion={}", str, selectByGroupId.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.groupExtMapper.logicDeleteAllByGroupId(str);
        LOGGER.info("success to delete group and group ext property for groupId={} by user={}", str, str2);
        return true;
    }

    private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> list) {
        HashMap hashMap = new HashMap();
        list.forEach(inlongGroupExtInfo -> {
            hashMap.put(inlongGroupExtInfo.getKeyName(), inlongGroupExtInfo.getKeyValue());
        });
        String str = hashMap.get("sort.type");
        if (StringUtils.isBlank(str)) {
            return null;
        }
        BaseSortConf.SortType forType = BaseSortConf.SortType.forType(str);
        switch (AnonymousClass4.$SwitchMap$org$apache$inlong$manager$pojo$sort$BaseSortConf$SortType[forType.ordinal()]) {
            case 1:
                return createFlinkSortConfig(hashMap);
            case 2:
                return createUserDefinedSortConfig(hashMap);
            default:
                LOGGER.warn("unsupported sort config for sortType: {}", forType);
                return null;
        }
    }

    private FlinkSortConf createFlinkSortConfig(Map<String, String> map) {
        FlinkSortConf flinkSortConf = new FlinkSortConf();
        flinkSortConf.setServiceUrl(map.get("sort.url"));
        String str = map.get("sort.properties");
        if (StringUtils.isNotBlank(str)) {
            flinkSortConf.setProperties((Map) JsonUtils.parseObject(str, new TypeReference<Map<String, String>>() { // from class: org.apache.inlong.manager.service.group.InlongGroupServiceImpl.1
            }));
        } else {
            flinkSortConf.setProperties(Maps.newHashMap());
        }
        String str2 = map.get("sort.authentication.type");
        if (StringUtils.isNotBlank(str2)) {
            Preconditions.expectTrue(Authentication.AuthType.forType(str2) == Authentication.AuthType.SECRET_AND_TOKEN, "Only support SECRET_AND_TOKEN for flink sort auth");
            Map map2 = (Map) JsonUtils.parseObject(map.get("sort.authentication"), new TypeReference<Map<String, String>>() { // from class: org.apache.inlong.manager.service.group.InlongGroupServiceImpl.2
            });
            SecretTokenAuthentication secretTokenAuthentication = new SecretTokenAuthentication();
            secretTokenAuthentication.configure(map2);
            flinkSortConf.setAuthentication(secretTokenAuthentication);
        }
        return flinkSortConf;
    }

    private UserDefinedSortConf createUserDefinedSortConfig(Map<String, String> map) {
        UserDefinedSortConf userDefinedSortConf = new UserDefinedSortConf();
        userDefinedSortConf.setSortName(map.get("sort.name"));
        String str = map.get("sort.properties");
        if (StringUtils.isNotBlank(str)) {
            userDefinedSortConf.setProperties((Map) JsonUtils.parseObject(str, new TypeReference<Map<String, String>>() { // from class: org.apache.inlong.manager.service.group.InlongGroupServiceImpl.3
            }));
        } else {
            userDefinedSortConf.setProperties(Maps.newHashMap());
        }
        return userDefinedSortConf;
    }

    private void chkUnmodifiableParams(InlongGroupEntity inlongGroupEntity, InlongGroupRequest inlongGroupRequest) {
        Preconditions.expectEquals(inlongGroupEntity.getMqType(), inlongGroupRequest.getMqType(), ErrorCodeEnum.INVALID_PARAMETER, "mqType not allowed modify");
        Preconditions.expectEquals(inlongGroupEntity.getVersion(), inlongGroupRequest.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED, String.format("record has expired with record version=%d, request version=%d", inlongGroupEntity.getVersion(), inlongGroupRequest.getVersion()));
    }
}
