/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupMqExtBase;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupPulsarEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupPulsarEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.source.SourceOperationFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class InlongGroupServiceImpl
implements InlongGroupService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);
    @Autowired
    InlongGroupPulsarEntityMapper groupPulsarMapper;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongGroupExtEntityMapper groupExtMapper;
    @Autowired
    private StreamSourceEntityMapper streamSourceEntityMapper;
    @Autowired
    private SourceOperationFactory operationFactory;
    @Autowired
    private CommonOperateService commonOperateService;
    @Autowired
    private InlongStreamService streamService;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public String save(InlongGroupRequest groupInfo, String operator) {
        LOGGER.debug("begin to save inlong group info={}", (Object)groupInfo);
        Preconditions.checkNotNull((Object)groupInfo, (String)"inlong group info is empty");
        String groupName = groupInfo.getName();
        Preconditions.checkNotNull((Object)groupName, (String)"inlong group name is empty");
        String groupId = "b_" + groupName.toLowerCase(Locale.ROOT);
        Integer count = this.groupMapper.selectIdentifierExist(groupId);
        if (count >= 1) {
            LOGGER.error("groupId [{}] has already exists", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
        }
        InlongGroupEntity entity = (InlongGroupEntity)CommonBeanUtils.copyProperties((Object)groupInfo, InlongGroupEntity::new);
        entity.setInlongGroupId(groupId);
        if (StringUtils.isEmpty((CharSequence)entity.getMqResourceObj())) {
            entity.setMqResourceObj(groupId);
        }
        entity.setSchemaName("m0_day");
        entity.setStatus(GroupState.TO_BE_SUBMIT.getCode());
        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
        if (StringUtils.isEmpty((CharSequence)entity.getCreator())) {
            entity.setCreator(operator);
        }
        if (StringUtils.isEmpty((CharSequence)entity.getModifier())) {
            entity.setModifier(operator);
        }
        entity.setCreateTime(new Date());
        this.groupMapper.insertSelective(entity);
        this.saveOrUpdateExt(groupId, groupInfo.getExtList());
        MQType mqType = MQType.forType((String)groupInfo.getMiddlewareType());
        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
            InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo)groupInfo.getMqExtInfo();
            Preconditions.checkNotNull((Object)pulsarInfo, (String)"Pulsar info cannot be empty, as the middleware is Pulsar");
            Integer ackQuorum = pulsarInfo.getAckQuorum();
            Integer writeQuorum = pulsarInfo.getWriteQuorum();
            Preconditions.checkNotNull((Object)ackQuorum, (String)"Pulsar ackQuorum cannot be empty");
            Preconditions.checkNotNull((Object)writeQuorum, (String)"Pulsar writeQuorum cannot be empty");
            if (ackQuorum > writeQuorum) {
                throw new BusinessException(ErrorCodeEnum.GROUP_SAVE_FAILED, "Pulsar params must meet: ackQuorum <= writeQuorum");
            }
            pulsarInfo.setEnsemble(writeQuorum);
            InlongGroupPulsarEntity pulsarEntity = this.groupPulsarMapper.selectByGroupId(groupId);
            if (pulsarEntity == null) {
                pulsarEntity = (InlongGroupPulsarEntity)CommonBeanUtils.copyProperties((Object)pulsarInfo, InlongGroupPulsarEntity::new);
                pulsarEntity.setIsDeleted(Integer.valueOf(0));
                pulsarEntity.setInlongGroupId(groupId);
                this.groupPulsarMapper.insertSelective(pulsarEntity);
            } else {
                Integer id = pulsarEntity.getId();
                pulsarEntity = (InlongGroupPulsarEntity)CommonBeanUtils.copyProperties((Object)pulsarInfo, InlongGroupPulsarEntity::new);
                pulsarEntity.setId(id);
                this.groupPulsarMapper.updateByPrimaryKeySelective(pulsarEntity);
            }
        }
        LOGGER.debug("success to save inlong group info for groupId={}", (Object)groupId);
        return groupId;
    }

    @Override
    public InlongGroupInfo get(String groupId) {
        LOGGER.debug("begin to get inlong group info by groupId={}", (Object)groupId);
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongGroupInfo groupInfo = (InlongGroupInfo)CommonBeanUtils.copyProperties((Object)entity, InlongGroupInfo::new);
        List extEntityList = this.groupExtMapper.selectByGroupId(groupId);
        List extInfoList = CommonBeanUtils.copyListProperties((List)extEntityList, InlongGroupExtInfo::new);
        groupInfo.setExtList(extInfoList);
        MQType mqType = MQType.forType((String)entity.getMiddlewareType());
        if (MQType.PULSAR == mqType || MQType.TDMQ_PULSAR == mqType) {
            InlongGroupPulsarEntity pulsarEntity = this.groupPulsarMapper.selectByGroupId(groupId);
            Preconditions.checkNotNull((Object)pulsarEntity, (String)("Pulsar info not found by the groupId=" + groupId));
            InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo)CommonBeanUtils.copyProperties((Object)pulsarEntity, InlongGroupPulsarInfo::new);
            pulsarInfo.setMiddlewareType(mqType.name());
            groupInfo.setMqExtInfo((InlongGroupMqExtBase)pulsarInfo);
        }
        if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode((int)groupInfo.getStatus())) {
            if (mqType == MQType.TUBE) {
                groupInfo.setTubeMaster(this.commonOperateService.getSpecifiedParam("tube_masterUrl"));
            } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                PulsarClusterInfo pulsarCluster = this.commonOperateService.getPulsarClusterInfo(mqType.name());
                groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
                groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl());
            }
        }
        LOGGER.debug("success to get inlong group for groupId={}", (Object)groupId);
        return groupInfo;
    }

    @Override
    public PageInfo<InlongGroupListResponse> listByCondition(InlongGroupPageRequest request) {
        LOGGER.debug("begin to list inlong group by {}", (Object)request);
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        Page entityPage = (Page)this.groupMapper.selectByCondition(request);
        List groupList = CommonBeanUtils.copyListProperties((List)entityPage, InlongGroupListResponse::new);
        if (request.isListSources() && CollectionUtils.isNotEmpty((Collection)groupList)) {
            List groupIds = groupList.stream().map(InlongGroupListResponse::getInlongGroupId).collect(Collectors.toList());
            List sourceEntities = this.streamSourceEntityMapper.selectByGroupIds(groupIds);
            HashMap sourceMap = Maps.newHashMap();
            sourceEntities.forEach(sourceEntity -> {
                SourceType sourceType = SourceType.forType((String)sourceEntity.getSourceType());
                StreamSourceOperation operation = this.operationFactory.getInstance(sourceType);
                SourceListResponse sourceListResponse = operation.getFromEntity((StreamSourceEntity)sourceEntity, SourceListResponse::new);
                sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList()).add(sourceListResponse);
            });
            groupList.forEach(group -> {
                List sourceListResponses = sourceMap.getOrDefault(group.getInlongGroupId(), Lists.newArrayList());
                group.setSourceListResponses(sourceListResponses);
            });
        }
        PageInfo page = new PageInfo(groupList);
        page.setTotal(entityPage.getTotal());
        LOGGER.debug("success to list inlong group");
        return page;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public String update(InlongGroupRequest groupRequest, String operator) {
        LOGGER.debug("begin to update inlong group={}", (Object)groupRequest);
        Preconditions.checkNotNull((Object)groupRequest, (String)"inlong group is empty");
        String groupId = groupRequest.getInlongGroupId();
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.checkGroupCanUpdate(entity, groupRequest, operator);
        CommonBeanUtils.copyProperties((Object)groupRequest, (Object)entity, (boolean)true);
        entity.setModifier(operator);
        this.groupMapper.updateByIdentifierSelective(entity);
        this.saveOrUpdateExt(groupId, groupRequest.getExtList());
        MQType mqType = MQType.forType((String)groupRequest.getMiddlewareType());
        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
            InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo)groupRequest.getMqExtInfo();
            Preconditions.checkNotNull((Object)pulsarInfo, (String)"Pulsar info cannot be empty, as the middleware is Pulsar");
            Integer writeQuorum = pulsarInfo.getWriteQuorum();
            Integer ackQuorum = pulsarInfo.getAckQuorum();
            if (ackQuorum > writeQuorum) {
                throw new BusinessException(ErrorCodeEnum.GROUP_SAVE_FAILED, "Pulsar params must meet: ackQuorum <= writeQuorum");
            }
            InlongGroupPulsarEntity pulsarEntity = (InlongGroupPulsarEntity)CommonBeanUtils.copyProperties((Object)pulsarInfo, InlongGroupPulsarEntity::new);
            pulsarEntity.setInlongGroupId(groupId);
            this.groupPulsarMapper.updateByIdentifierSelective(pulsarEntity);
        }
        LOGGER.debug("success to update inlong group for groupId={}", (Object)groupId);
        return groupId;
    }

    private void checkGroupCanUpdate(InlongGroupEntity entity, InlongGroupRequest groupInfo, String operator) {
        if (entity == null || groupInfo == null) {
            return;
        }
        if (StringUtils.isEmpty((CharSequence)entity.getInCharges())) {
            LOGGER.error("group [{}] has no inCharges", (Object)entity.getInlongGroupId());
            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCONSISTENT);
        }
        List<String> inCharges = Arrays.asList(entity.getInCharges().split(","));
        if (!inCharges.contains(operator)) {
            LOGGER.error("user [{}] has no privilege for the inlong group", (Object)operator);
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupState curState = GroupState.forCode((int)entity.getStatus());
        if (GroupState.notAllowedUpdate((GroupState)curState)) {
            String errMsg = String.format("Current state=%s is not allowed to update", curState);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ, propagation=Propagation.REQUIRES_NEW)
    public boolean updateStatus(String groupId, Integer status, String operator) {
        GroupState nextState;
        LOGGER.info("begin to update group status to [{}] by groupId={}, username={}", new Object[]{status, groupId, operator});
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        InlongGroupEntity entity = this.groupMapper.selectByGroupIdForUpdate(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupState curState = GroupState.forCode((int)entity.getStatus());
        if (GroupState.notAllowedTransition((GroupState)curState, (GroupState)(nextState = GroupState.forCode((int)status)))) {
            String errorMsg = String.format("Current state=%s is not allowed to transfer to state=%s", curState, nextState);
            LOGGER.error(errorMsg);
            throw new BusinessException(errorMsg);
        }
        this.groupMapper.updateStatus(groupId, status, operator);
        LOGGER.info("success to update inlong group status to [{}] for groupId={}", (Object)status, (Object)groupId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public boolean delete(String groupId, String operator) {
        LOGGER.debug("begin to delete inlong group, groupId={}", (Object)groupId);
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(groupId);
        if (entity == null) {
            LOGGER.error("inlong group not found by groupId={}", (Object)groupId);
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupState curState = GroupState.forCode((int)entity.getStatus());
        if (GroupState.notAllowedTransition((GroupState)curState, (GroupState)GroupState.DELETED)) {
            String errMsg = String.format("Current state=%s was not allowed to delete", curState);
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
        }
        if (GroupState.isAllowedLogicDel((GroupState)curState)) {
            this.streamService.logicDeleteAll(entity.getInlongGroupId(), operator);
        } else {
            int count = this.streamService.selectCountByGroupId(groupId);
            if (count >= 1) {
                LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", (Object)groupId, (Object)count);
                throw new BusinessException(ErrorCodeEnum.GROUP_HAS_STREAM);
            }
        }
        entity.setIsDeleted(entity.getId());
        entity.setStatus(GroupState.DELETED.getCode());
        entity.setModifier(operator);
        this.groupMapper.updateByIdentifierSelective(entity);
        this.groupExtMapper.logicDeleteAllByGroupId(groupId);
        this.groupPulsarMapper.logicDeleteByGroupId(groupId);
        LOGGER.info("success to delete inlong group and inlong group ext property for groupId={}", (Object)groupId);
        return true;
    }

    @Override
    public boolean exist(String groupId) {
        LOGGER.debug("begin to check inlong group, groupId={}", (Object)groupId);
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        Integer count = this.groupMapper.selectIdentifierExist(groupId);
        LOGGER.info("success to check inlong group");
        return count >= 1;
    }

    @Override
    public InlongGroupCountResponse countGroupByUser(String operator) {
        LOGGER.debug("begin to count inlong group by user={}", (Object)operator);
        InlongGroupCountResponse countVO = new InlongGroupCountResponse();
        List statusCount = this.groupMapper.countGroupByUser(operator);
        for (Map map : statusCount) {
            int status = (Integer)map.get("status");
            long count = (Long)map.get("count");
            countVO.setTotalCount(countVO.getTotalCount() + count);
            if (status == GroupState.CONFIG_ING.getCode()) {
                countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
                continue;
            }
            if (status == GroupState.TO_BE_APPROVAL.getCode()) {
                countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
                continue;
            }
            if (status != GroupState.APPROVE_REJECTED.getCode()) continue;
            countVO.setRejectCount(countVO.getRejectCount() + count);
        }
        LOGGER.info("success to count inlong group for operator={}", (Object)operator);
        return countVO;
    }

    @Override
    public InlongGroupTopicResponse getTopic(String groupId) {
        LOGGER.debug("begin to get topic by groupId={}", (Object)groupId);
        InlongGroupInfo groupInfo = this.get(groupId);
        MQType mqType = MQType.forType((String)groupInfo.getMiddlewareType());
        InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse();
        if (mqType == MQType.TUBE) {
            topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
            topicVO.setTubeMasterUrl(this.commonOperateService.getSpecifiedParam("tube_masterUrl"));
        } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
            topicVO.setDsTopicList(this.streamService.getTopicList(groupId));
            topicVO.setPulsarAdminUrl(this.commonOperateService.getSpecifiedParam("pulsar_adminUrl"));
            topicVO.setPulsarServiceUrl(this.commonOperateService.getSpecifiedParam("pulsar_serviceUrl"));
        } else {
            LOGGER.error("middleware type={} not supported", (Object)mqType);
            throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED);
        }
        topicVO.setInlongGroupId(groupId);
        topicVO.setMiddlewareType(mqType.name());
        return topicVO;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public boolean updateAfterApprove(InlongGroupApproveRequest approveInfo, String operator) {
        LOGGER.debug("begin to update inlong group after approve={}", (Object)approveInfo);
        Preconditions.checkNotNull((Object)approveInfo, (String)"InlongGroupApproveRequest is empty");
        String groupId = approveInfo.getInlongGroupId();
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        String middlewareType = approveInfo.getMiddlewareType();
        Preconditions.checkNotNull((Object)middlewareType, (String)"Middleware type is empty");
        this.updateStatus(groupId, GroupState.APPROVE_PASSED.getCode(), operator);
        LOGGER.info("success to update inlong group status after approve for groupId={}", (Object)groupId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> infoList) {
        LOGGER.debug("begin to save or update inlong group ext info, groupId={}, ext={}", (Object)groupId, infoList);
        if (CollectionUtils.isEmpty(infoList)) {
            return;
        }
        List entityList = CommonBeanUtils.copyListProperties(infoList, InlongGroupExtEntity::new);
        Date date = new Date();
        for (InlongGroupExtEntity entity : entityList) {
            entity.setInlongGroupId(groupId);
            entity.setModifyTime(date);
        }
        this.groupExtMapper.insertOnDuplicateKeyUpdate(entityList);
        LOGGER.info("success to save or update inlong group ext for groupId={}", (Object)groupId);
    }
}

