package org.apache.inlong.manager.service.consume;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
import org.apache.inlong.manager.pojo.common.CountInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.class */
public class InlongConsumeServiceImpl implements InlongConsumeService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
    private static final String AUTO_CREATE_MSG = "auto_create_by_system";

    @Autowired
    private InlongConsumeEntityMapper consumeMapper;

    @Autowired
    private InlongConsumeOperatorFactory consumeOperatorFactory;

    @Autowired
    private UserService userService;

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public Integer save(InlongConsumeRequest inlongConsumeRequest, String str) {
        LOGGER.debug("begin to save inlong consume={} by user={}", inlongConsumeRequest, str);
        Preconditions.expectNotNull(inlongConsumeRequest, "inlong consume request cannot be null");
        Preconditions.expectNotBlank(inlongConsumeRequest.getTopic(), ErrorCodeEnum.INVALID_PARAMETER, "inlong consume topic cannot be null");
        String consumerGroup = inlongConsumeRequest.getConsumerGroup();
        Preconditions.expectNotBlank(consumerGroup, ErrorCodeEnum.INVALID_PARAMETER, "inlong consume topic cannot be null");
        if (consumerGroupExists(consumerGroup, inlongConsumeRequest.getId())) {
            throw new BusinessException(String.format("consumer group %s already exist", consumerGroup));
        }
        Integer saveOpt = this.consumeOperatorFactory.getInstance(inlongConsumeRequest.getMqType()).saveOpt(inlongConsumeRequest, str);
        LOGGER.info("success to save inlong consume for consumer group={} by user={}", consumerGroup, str);
        return saveOpt;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public Integer saveBySystem(InlongGroupInfo inlongGroupInfo, String str, String str2) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        InlongConsumeEntity selectExists = this.consumeMapper.selectExists(str2, str, inlongGroupId);
        if (selectExists != null) {
            LOGGER.warn("inlong consume already exists for groupId={} topic={} consumerGroup={}, skip to create", new Object[]{inlongGroupId, str, str2});
            return selectExists.getId();
        }
        LOGGER.debug("begin to save inlong consume for groupId={} topic={} group={}", new Object[]{inlongGroupId, str, str2});
        InlongConsumeEntity inlongConsumeEntity = new InlongConsumeEntity();
        inlongConsumeEntity.setConsumerGroup(str2);
        inlongConsumeEntity.setDescription(AUTO_CREATE_MSG);
        inlongConsumeEntity.setMqType(inlongGroupInfo.getMqType());
        inlongConsumeEntity.setTopic(str);
        inlongConsumeEntity.setInlongGroupId(inlongGroupId);
        inlongConsumeEntity.setFilterEnabled(0);
        inlongConsumeEntity.setInCharges(inlongGroupInfo.getInCharges());
        inlongConsumeEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
        String creator = inlongGroupInfo.getCreator();
        inlongConsumeEntity.setCreator(creator);
        inlongConsumeEntity.setModifier(creator);
        this.consumeMapper.insert(inlongConsumeEntity);
        LOGGER.debug("success save inlong consume for groupId={} topic={} group={}", new Object[]{inlongGroupId, str, str2});
        return inlongConsumeEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public boolean consumerGroupExists(String str, Integer num) {
        List selectByCondition = this.consumeMapper.selectByCondition(InlongConsumePageRequest.builder().consumerGroup(str).isAdminRole(true).build());
        if (num != null) {
            selectByCondition = (List) selectByCondition.stream().filter(inlongConsumeEntity -> {
                return !num.equals(inlongConsumeEntity.getId());
            }).collect(Collectors.toList());
        }
        return CollectionUtils.isNotEmpty(selectByCondition);
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public InlongConsumeInfo get(Integer num, String str) {
        Preconditions.expectNotNull(num, "inlong consume id cannot be null");
        InlongConsumeEntity selectById = this.consumeMapper.selectById(num);
        if (selectById == null) {
            LOGGER.error("inlong consume not found with id={}", num);
            throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
        }
        this.userService.checkUser(selectById.getInCharges(), str, "Current user does not have permission to get inlong consume");
        InlongConsumeInfo fromEntity = this.consumeOperatorFactory.getInstance(selectById.getMqType()).getFromEntity(selectById);
        LOGGER.debug("success to get inlong consume for id={}", num);
        return fromEntity;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public InlongConsumeCountInfo countStatus(String str) {
        List<CountInfo> countByUser = this.consumeMapper.countByUser(str);
        InlongConsumeCountInfo inlongConsumeCountInfo = new InlongConsumeCountInfo();
        for (CountInfo countInfo : countByUser) {
            int parseInt = Integer.parseInt(countInfo.getKey());
            int intValue = countInfo.getValue().intValue();
            inlongConsumeCountInfo.setTotalCount(Integer.valueOf(inlongConsumeCountInfo.getTotalCount().intValue() + intValue));
            if (parseInt == ConsumeStatus.TO_BE_SUBMIT.getCode().intValue()) {
                inlongConsumeCountInfo.setWaitAssignCount(Integer.valueOf(inlongConsumeCountInfo.getWaitAssignCount().intValue() + intValue));
            } else if (parseInt == ConsumeStatus.TO_BE_APPROVAL.getCode().intValue()) {
                inlongConsumeCountInfo.setWaitApproveCount(Integer.valueOf(inlongConsumeCountInfo.getWaitApproveCount().intValue() + intValue));
            } else if (parseInt == ConsumeStatus.APPROVE_REJECTED.getCode().intValue()) {
                inlongConsumeCountInfo.setRejectCount(Integer.valueOf(inlongConsumeCountInfo.getRejectCount().intValue() + intValue));
            }
        }
        LOGGER.debug("success to count inlong consume for user={}", str);
        return inlongConsumeCountInfo;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest inlongConsumePageRequest) {
        if (inlongConsumePageRequest.getPageSize() > PageRequest.MAX_PAGE_SIZE.intValue()) {
            LOGGER.warn("list inlong consumes, change page size from {} to {}", Integer.valueOf(inlongConsumePageRequest.getPageSize()), PageRequest.MAX_PAGE_SIZE);
            inlongConsumePageRequest.setPageSize(PageRequest.MAX_PAGE_SIZE.intValue());
        }
        PageHelper.startPage(inlongConsumePageRequest.getPageNum(), inlongConsumePageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(inlongConsumePageRequest);
        OrderTypeEnum.checkOrderType(inlongConsumePageRequest);
        Page selectBriefList = this.consumeMapper.selectBriefList(inlongConsumePageRequest);
        PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(selectBriefList, Long.valueOf(selectBriefList.getTotal()), Integer.valueOf(selectBriefList.getPageNum()), Integer.valueOf(selectBriefList.getPageSize()));
        LOGGER.debug("success to list inlong consume for {}", inlongConsumePageRequest);
        return pageResult;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public Integer update(InlongConsumeRequest inlongConsumeRequest, String str) {
        LOGGER.debug("begin to update inlong consume={} by user={}", inlongConsumeRequest, str);
        Preconditions.expectNotNull(inlongConsumeRequest, "inlong consume request cannot be null");
        Integer id = inlongConsumeRequest.getId();
        InlongConsumeEntity selectById = this.consumeMapper.selectById(id);
        Preconditions.expectNotNull(selectById, "inlong consume not exist with id " + id);
        this.userService.checkUser(selectById.getInCharges(), str, "Current user does not have permission to update inlong consume");
        if (!Objects.equals(selectById.getVersion(), inlongConsumeRequest.getVersion())) {
            LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s", selectById.getId(), inlongConsumeRequest.getVersion()));
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        ConsumeStatus forCode = ConsumeStatus.forCode(selectById.getStatus().intValue());
        Preconditions.expectTrue(ConsumeStatus.allowedUpdate(forCode), "inlong consume not allowed update when status is " + forCode.name());
        this.consumeOperatorFactory.getInstance(inlongConsumeRequest.getMqType()).updateOpt(inlongConsumeRequest, str);
        LOGGER.info("success to update inlong consume={} by user={}", inlongConsumeRequest, str);
        return id;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public Boolean updateStatus(Integer num, Integer num2, String str) {
        LOGGER.info("begin to update consume status to [{}] for id={} by user={}", new Object[]{num2, num, str});
        Preconditions.expectNotNull(num, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        InlongConsumeEntity selectById = this.consumeMapper.selectById(num);
        if (selectById == null) {
            LOGGER.error("inlong consume not found by id={}", num);
            throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
        }
        ConsumeStatus forCode = ConsumeStatus.forCode(selectById.getStatus().intValue());
        ConsumeStatus forCode2 = ConsumeStatus.forCode(num2.intValue());
        if (ConsumeStatus.notAllowedTransfer(forCode, forCode2)) {
            String format = String.format("Current status=%s cannot transfer to status=%s", forCode, forCode2);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        this.consumeMapper.updateStatus(num, num2, str);
        LOGGER.info("success to update consume status to [{}] for id={} by user={}", new Object[]{num2, num, str});
        return true;
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeService
    public Boolean delete(Integer num, String str) {
        LOGGER.info("begin to delete inlong consume for id={} by user={}", num, str);
        Preconditions.expectNotNull(num, "inlong consume id cannot be null");
        InlongConsumeEntity selectById = this.consumeMapper.selectById(num);
        Preconditions.expectNotNull(selectById, "inlong consume not exist with id " + num);
        this.userService.checkUser(selectById.getInCharges(), str, "Current user does not have permission to delete inlong consume");
        selectById.setIsDeleted(num);
        selectById.setStatus(ConsumeStatus.DELETED.getCode());
        selectById.setModifier(str);
        if (this.consumeMapper.updateByIdSelective(selectById) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("inlong consume has already updated with id={}, curVersion={}", num, selectById.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong consume for id={} by user={}", num, str);
        return true;
    }
}
