/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.group;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupOperator;
import org.apache.inlong.manager.service.group.InlongGroupOperatorFactory;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.SourceOperationFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class InlongGroupServiceImpl
implements InlongGroupService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);
    private static final Integer MAX_PAGE_SIZE = 100;
    @Autowired
    private InlongGroupOperatorFactory groupOperatorFactory;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongGroupExtEntityMapper groupExtMapper;
    @Autowired
    private StreamSourceEntityMapper streamSourceMapper;
    @Autowired
    private SourceOperationFactory sourceOperationFactory;
    @Autowired
    private InlongStreamService streamService;

    private static void checkGroupCanUpdate(InlongGroupEntity entity, InlongGroupRequest request, String operator) {
        if (entity == null || request == null) {
            return;
        }
        List<String> inCharges = Arrays.asList(entity.getInCharges().split(","));
        if (!inCharges.contains(operator)) {
            LOGGER.error("user [{}] has no privilege for the inlong group", (Object)operator);
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus curStatus = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curStatus)) {
            String errMsg = String.format("Current status=%s is not allowed to update", curStatus);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
        }
        if (!entity.getMqType().equals(request.getMqType()) && GroupStatus.notAllowedUpdateMQ((GroupStatus)curStatus)) {
            String errMsg = String.format("Current status=%s is not allowed to update MQ type", curStatus);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public String save(InlongGroupRequest request, String operator) {
        LOGGER.debug("begin to save inlong group={} by user={}", (Object)request, (Object)operator);
        Preconditions.checkNotNull((Object)request, (String)"inlong group request cannot be empty");
        request.checkParams();
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity != null) {
            LOGGER.error("groupId {} has already exists", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        groupId = instance.saveOpt(request, operator);
        this.saveOrUpdateExt(groupId, request.getExtList());
        LOGGER.info("success to save inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        return groupId;
    }

    @Override
    public InlongGroupInfo get(String groupId) {
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(entity.getMqType());
        InlongGroupInfo groupInfo = instance.getFromEntity(entity);
        List extEntityList = this.groupExtMapper.selectByGroupId(groupId);
        List extList = CommonBeanUtils.copyListProperties((List)extEntityList, InlongGroupExtInfo::new);
        groupInfo.setExtList(extList);
        LOGGER.debug("success to get inlong group for groupId={}", (Object)groupId);
        return groupInfo;
    }

    @Override
    public PageInfo<InlongGroupListResponse> listByPage(InlongGroupPageRequest request) {
        if (request.getPageSize() > MAX_PAGE_SIZE) {
            LOGGER.warn("list group info, but page size is {}, change to {}", (Object)request.getPageSize(), (Object)MAX_PAGE_SIZE);
            request.setPageSize(MAX_PAGE_SIZE.intValue());
        }
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.groupMapper.selectByCondition(request);
        List groupResponseList = CommonBeanUtils.copyListProperties((List)entityPage, InlongGroupListResponse::new);
        if (request.isListSources() && CollectionUtils.isNotEmpty((Collection)groupResponseList)) {
            Set groupIds = groupResponseList.stream().map(InlongGroupListResponse::getInlongGroupId).collect(Collectors.toSet());
            List sourceEntities = this.streamSourceMapper.selectByGroupIds(new ArrayList(groupIds));
            HashMap sourceMap = Maps.newHashMap();
            sourceEntities.forEach(sourceEntity -> {
                SourceType sourceType = SourceType.forType((String)sourceEntity.getSourceType());
                StreamSourceOperation operation = this.sourceOperationFactory.getInstance(sourceType);
                SourceListResponse sourceListResponse = operation.getFromEntity((StreamSourceEntity)sourceEntity, SourceListResponse::new);
                sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList()).add(sourceListResponse);
            });
            groupResponseList.forEach(group -> {
                List sourceListResponses = sourceMap.getOrDefault(group.getInlongGroupId(), Lists.newArrayList());
                group.setSourceResponses(sourceListResponses);
            });
        }
        PageInfo page = new PageInfo(groupResponseList);
        page.setTotal(entityPage.getTotal());
        LOGGER.debug("success to list inlong group for {}", (Object)request);
        return page;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest request, String operator) {
        LOGGER.debug("begin to update inlong group={} by user={}", (Object)request, (Object)operator);
        Preconditions.checkNotNull((Object)request, (String)"inlong group request cannot be empty");
        request.checkParams();
        String groupId = request.getInlongGroupId();
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongGroupServiceImpl.checkGroupCanUpdate(entity, request, operator);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(request.getMqType());
        instance.updateOpt(request, operator);
        this.saveOrUpdateExt(groupId, request.getExtList());
        LOGGER.info("success to update inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        return groupId;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public boolean updateStatus(String groupId, Integer status, String operator) {
        GroupStatus nextState;
        LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", new Object[]{status, groupId, operator});
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupIdForUpdate(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus curState = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedTransition((GroupStatus)curState, (GroupStatus)(nextState = GroupStatus.forCode((int)status)))) {
            String errorMsg = String.format("Current status=%s is not allowed to transfer to state=%s", curState, nextState);
            LOGGER.error(errorMsg);
            throw new BusinessException(errorMsg);
        }
        this.groupMapper.updateStatus(groupId, status, operator);
        LOGGER.info("success to update group status to [{}] for groupId={} by user={}", new Object[]{status, groupId, operator});
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public boolean delete(String groupId, String operator) {
        LOGGER.info("begin to delete inlong group for groupId={} by user={}", (Object)groupId, (Object)operator);
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus curState = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedTransition((GroupStatus)curState, (GroupStatus)GroupStatus.DELETED)) {
            String errMsg = String.format("Current status=%s was not allowed to delete", curState);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
        }
        if (GroupStatus.allowedLogicDelete((GroupStatus)curState)) {
            this.streamService.logicDeleteAll(entity.getInlongGroupId(), operator);
        } else {
            int count = this.streamService.selectCountByGroupId(groupId);
            if (count >= 1) {
                LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", (Object)groupId, (Object)count);
                throw new BusinessException(ErrorCodeEnum.GROUP_HAS_STREAM);
            }
        }
        entity.setIsDeleted(entity.getId());
        entity.setStatus(GroupStatus.DELETED.getCode());
        entity.setModifier(operator);
        this.groupMapper.updateByIdentifierSelective(entity);
        this.groupExtMapper.logicDeleteAllByGroupId(groupId);
        LOGGER.info("success to delete group and group ext property for groupId={} by user={}", (Object)groupId, (Object)operator);
        return true;
    }

    @Override
    public Boolean exist(String groupId) {
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        LOGGER.debug("success to check inlong group {}, exist? {}", (Object)groupId, (Object)(entity != null ? 1 : 0));
        return entity != null;
    }

    @Override
    public InlongGroupCountResponse countGroupByUser(String operator) {
        InlongGroupCountResponse countVO = new InlongGroupCountResponse();
        List statusCount = this.groupMapper.countGroupByUser(operator);
        for (Map map : statusCount) {
            int status = (Integer)map.get("status");
            long count = (Long)map.get("count");
            countVO.setTotalCount(countVO.getTotalCount() + count);
            if (status == GroupStatus.CONFIG_ING.getCode()) {
                countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
                continue;
            }
            if (status == GroupStatus.TO_BE_APPROVAL.getCode()) {
                countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
                continue;
            }
            if (status != GroupStatus.APPROVE_REJECTED.getCode()) continue;
            countVO.setRejectCount(countVO.getRejectCount() + count);
        }
        LOGGER.debug("success to count inlong group for operator={}", (Object)operator);
        return countVO;
    }

    @Override
    public InlongGroupTopicInfo getTopic(String groupId) {
        InlongGroupInfo groupInfo = this.get(groupId);
        InlongGroupOperator instance = this.groupOperatorFactory.getInstance(groupInfo.getMqType());
        InlongGroupTopicInfo topicInfo = instance.getTopic(groupInfo);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get topic for groupId={}, result=" + topicInfo, (Object)groupId);
        }
        return topicInfo;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW)
    public boolean updateAfterApprove(InlongGroupApproveRequest approveInfo, String operator) {
        LOGGER.debug("begin to update inlong group after approve={}", (Object)approveInfo);
        Preconditions.checkNotNull((Object)approveInfo, (String)"InlongGroupApproveRequest is empty");
        String groupId = approveInfo.getInlongGroupId();
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        String mqType = approveInfo.getMqType();
        Preconditions.checkNotNull((Object)mqType, (String)"MQ type cannot by empty");
        this.updateStatus(groupId, GroupStatus.APPROVE_PASSED.getCode(), operator);
        if (StringUtils.isNotBlank((CharSequence)approveInfo.getInlongClusterTag())) {
            InlongGroupEntity entity = new InlongGroupEntity();
            entity.setInlongGroupId(approveInfo.getInlongGroupId());
            entity.setInlongClusterTag(approveInfo.getInlongClusterTag());
            entity.setModifier(operator);
            this.groupMapper.updateByIdentifierSelective(entity);
        }
        LOGGER.info("success to update inlong group status after approve for groupId={}", (Object)groupId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> exts) {
        LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", (Object)groupId, exts);
        if (CollectionUtils.isEmpty(exts)) {
            return;
        }
        List entityList = CommonBeanUtils.copyListProperties(exts, InlongGroupExtEntity::new);
        for (InlongGroupExtEntity entity : entityList) {
            entity.setInlongGroupId(groupId);
        }
        this.groupExtMapper.insertOnDuplicateKeyUpdate(entityList);
        LOGGER.info("success to save or update inlong group ext for groupId={}", (Object)groupId);
    }
}

