package org.apache.inlong.manager.service.core.impl;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.class */
public class ConsumptionServiceImpl implements ConsumptionService {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
    private static final String PREFIX_DLQ = "dlq";
    private static final String PREFIX_RLQ = "rlq";

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private ConsumptionEntityMapper consumptionMapper;

    @Autowired
    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;

    @Autowired
    private InlongGroupService groupService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private InlongClusterService clusterService;

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    public ConsumptionSummary getSummary(ConsumptionQuery consumptionQuery) {
        HashMap hashMap = new HashMap();
        this.consumptionMapper.countByQuery(consumptionQuery).forEach(countInfo -> {
        });
        return ConsumptionSummary.builder().totalCount(Integer.valueOf(hashMap.values().stream().mapToInt(num -> {
            return num.intValue();
        }).sum())).waitingAssignCount((Integer) hashMap.getOrDefault(ConsumptionStatus.WAIT_ASSIGN.getStatus() + "", 0)).waitingApproveCount((Integer) hashMap.getOrDefault(ConsumptionStatus.WAIT_APPROVE.getStatus() + "", 0)).rejectedCount((Integer) hashMap.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build();
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    public PageResult<ConsumptionListVo> list(ConsumptionQuery consumptionQuery) {
        PageHelper.startPage(consumptionQuery.getPageNum(), consumptionQuery.getPageSize());
        consumptionQuery.setIsAdminRole(Boolean.valueOf(LoginUserUtils.getLoginUser().getRoles().contains("ADMIN")));
        Page listByQuery = this.consumptionMapper.listByQuery(consumptionQuery);
        return new PageResult<>(CommonBeanUtils.copyListProperties(listByQuery.getResult(), ConsumptionListVo::new), Long.valueOf(listByQuery.getTotal()), Integer.valueOf(listByQuery.getPageNum()), Integer.valueOf(listByQuery.getPageSize()));
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    public ConsumptionInfo get(Integer num) {
        Preconditions.checkNotNull(num, "consumption id cannot be null");
        ConsumptionEntity selectByPrimaryKey = this.consumptionMapper.selectByPrimaryKey(num);
        Preconditions.checkNotNull(selectByPrimaryKey, "consumption not exist with id:" + num);
        ConsumptionInfo consumptionInfo = (ConsumptionInfo) CommonBeanUtils.copyProperties(selectByPrimaryKey, ConsumptionInfo::new);
        String mqType = consumptionInfo.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            ConsumptionPulsarEntity selectByConsumptionId = this.consumptionPulsarMapper.selectByConsumptionId(consumptionInfo.getId());
            Preconditions.checkNotNull(selectByConsumptionId, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
            consumptionInfo.setMqExtInfo((ConsumptionPulsarInfo) CommonBeanUtils.copyProperties(selectByConsumptionId, ConsumptionPulsarInfo::new));
        }
        return consumptionInfo;
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    public boolean isConsumerGroupExists(String str, Integer num) {
        ConsumptionQuery consumptionQuery = new ConsumptionQuery();
        consumptionQuery.setConsumerGroup(str);
        consumptionQuery.setIsAdminRole(true);
        List listByQuery = this.consumptionMapper.listByQuery(consumptionQuery);
        if (num != null) {
            listByQuery = (List) listByQuery.stream().filter(consumptionEntity -> {
                return !num.equals(consumptionEntity.getId());
            }).collect(Collectors.toList());
        }
        return !CollectionUtils.isEmpty(listByQuery);
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(ConsumptionInfo consumptionInfo, String str) {
        log.debug("begin to save consumption info={}", consumptionInfo);
        Preconditions.checkNotNull(consumptionInfo, "consumption info cannot be null");
        Preconditions.checkNotNull(consumptionInfo.getTopic(), "consumption topic cannot be empty");
        if (isConsumerGroupExists(consumptionInfo.getConsumerGroup(), consumptionInfo.getId())) {
            throw new BusinessException(String.format("consumer group %s already exist", consumptionInfo.getConsumerGroup()));
        }
        if (consumptionInfo.getId() != null) {
            ConsumptionEntity selectByPrimaryKey = this.consumptionMapper.selectByPrimaryKey(consumptionInfo.getId());
            Preconditions.checkNotNull(selectByPrimaryKey, "consumption not exist with id: " + consumptionInfo.getId());
            ConsumptionStatus fromStatus = ConsumptionStatus.fromStatus(selectByPrimaryKey.getStatus().intValue());
            Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(fromStatus), "consumption not allow update when status is " + fromStatus.name());
        }
        setTopicInfo(consumptionInfo);
        ConsumptionEntity saveConsumption = saveConsumption(consumptionInfo, str);
        String mqType = saveConsumption.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            savePulsarInfo(consumptionInfo.getMqExtInfo(), saveConsumption);
        }
        log.info("success to save consumption info by user={}", str);
        return saveConsumption.getId();
    }

    private void savePulsarInfo(ConsumptionMqExtBase consumptionMqExtBase, ConsumptionEntity consumptionEntity) {
        Preconditions.checkNotNull(consumptionMqExtBase, "Pulsar info cannot be empty, as the middleware is Pulsar");
        ConsumptionPulsarInfo consumptionPulsarInfo = consumptionMqExtBase instanceof ConsumptionPulsarInfo ? (ConsumptionPulsarInfo) consumptionMqExtBase : new ConsumptionPulsarInfo();
        boolean z = consumptionPulsarInfo.getIsDlq() != null && consumptionPulsarInfo.getIsDlq().intValue() == 1;
        boolean z2 = consumptionPulsarInfo.getIsRlq() != null && consumptionPulsarInfo.getIsRlq().intValue() == 1;
        if (z2 && !z) {
            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String inlongGroupId = consumptionEntity.getInlongGroupId();
        if (z) {
            if (this.streamService.exist(inlongGroupId, "dlq_" + consumptionPulsarInfo.getDeadLetterTopic()).booleanValue()) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
            }
        } else {
            consumptionPulsarInfo.setIsDlq(0);
            consumptionPulsarInfo.setDeadLetterTopic((String) null);
        }
        if (z2) {
            if (this.streamService.exist(inlongGroupId, "rlq_" + consumptionPulsarInfo.getRetryLetterTopic()).booleanValue()) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
            }
        } else {
            consumptionPulsarInfo.setIsRlq(0);
            consumptionPulsarInfo.setRetryLetterTopic((String) null);
        }
        ConsumptionPulsarEntity consumptionPulsarEntity = (ConsumptionPulsarEntity) CommonBeanUtils.copyProperties(consumptionPulsarInfo, ConsumptionPulsarEntity::new);
        Integer id = consumptionEntity.getId();
        consumptionPulsarEntity.setConsumptionId(id);
        consumptionPulsarEntity.setInlongGroupId(inlongGroupId);
        consumptionPulsarEntity.setConsumerGroup(consumptionEntity.getConsumerGroup());
        consumptionPulsarEntity.setIsDeleted(0);
        ConsumptionPulsarEntity selectByConsumptionId = this.consumptionPulsarMapper.selectByConsumptionId(id);
        if (selectByConsumptionId == null) {
            this.consumptionPulsarMapper.insert(consumptionPulsarEntity);
        } else {
            consumptionPulsarEntity.setId(selectByConsumptionId.getId());
            this.consumptionPulsarMapper.updateByPrimaryKey(consumptionPulsarEntity);
        }
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(ConsumptionInfo consumptionInfo, String str) {
        Preconditions.checkNotNull(consumptionInfo, "consumption info cannot be null");
        Integer id = consumptionInfo.getId();
        Preconditions.checkNotNull(id, "consumption id cannot be null");
        ConsumptionEntity selectByPrimaryKey = this.consumptionMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull(selectByPrimaryKey, "consumption not exist with id " + id);
        Preconditions.checkTrue(selectByPrimaryKey.getInCharges().contains(str), "operator" + str + " has no privilege for the consumption");
        String format = String.format("consumption information has already updated, id=%s, curVersion=%s", selectByPrimaryKey.getId(), consumptionInfo.getVersion());
        if (!Objects.equals(selectByPrimaryKey.getVersion(), consumptionInfo.getVersion())) {
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        ConsumptionEntity consumptionEntity = new ConsumptionEntity();
        Date date = new Date();
        CommonBeanUtils.copyProperties(consumptionInfo, consumptionEntity, true);
        consumptionEntity.setModifier(str);
        consumptionEntity.setModifyTime(date);
        String mqType = consumptionInfo.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            ConsumptionPulsarEntity selectByConsumptionId = this.consumptionPulsarMapper.selectByConsumptionId(id);
            Preconditions.checkNotNull(selectByConsumptionId, "Pulsar consumption cannot be null");
            selectByConsumptionId.setConsumerGroup(consumptionInfo.getConsumerGroup());
            ConsumptionPulsarInfo mqExtInfo = consumptionInfo.getMqExtInfo();
            boolean z = mqExtInfo.getIsDlq() != null && mqExtInfo.getIsDlq().intValue() == 1;
            boolean z2 = mqExtInfo.getIsRlq() != null && mqExtInfo.getIsRlq().intValue() == 1;
            if (z2 && !z) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
            }
            if (ConsumptionStatus.APPROVED.getStatus() == selectByPrimaryKey.getStatus().intValue()) {
                String inlongGroupId = consumptionInfo.getInlongGroupId();
                String deadLetterTopic = selectByConsumptionId.getDeadLetterTopic();
                String deadLetterTopic2 = mqExtInfo.getDeadLetterTopic();
                if (!z) {
                    selectByConsumptionId.setIsDlq(0);
                    selectByConsumptionId.setDeadLetterTopic((String) null);
                    this.streamService.logicDeleteDlqOrRlq(inlongGroupId, deadLetterTopic, str);
                } else if (!Objects.equals(deadLetterTopic2, deadLetterTopic)) {
                    selectByConsumptionId.setIsDlq(1);
                    String lowerCase = ("dlq_" + deadLetterTopic2).toLowerCase(Locale.ROOT);
                    selectByConsumptionId.setDeadLetterTopic(lowerCase);
                    this.streamService.insertDlqOrRlq(inlongGroupId, lowerCase, str);
                }
                String retryLetterTopic = selectByConsumptionId.getRetryLetterTopic();
                String retryLetterTopic2 = mqExtInfo.getRetryLetterTopic();
                if (!z2) {
                    selectByConsumptionId.setIsRlq(0);
                    selectByConsumptionId.setRetryLetterTopic((String) null);
                    this.streamService.logicDeleteDlqOrRlq(inlongGroupId, retryLetterTopic, str);
                } else if (!Objects.equals(retryLetterTopic2, selectByConsumptionId.getRetryLetterTopic())) {
                    selectByConsumptionId.setIsRlq(1);
                    String lowerCase2 = ("rlq_" + retryLetterTopic2).toLowerCase(Locale.ROOT);
                    selectByConsumptionId.setRetryLetterTopic(lowerCase2);
                    this.streamService.insertDlqOrRlq(inlongGroupId, lowerCase2, str);
                }
            }
            this.consumptionPulsarMapper.updateByConsumptionId(selectByConsumptionId);
        }
        if (this.consumptionMapper.updateByPrimaryKeySelective(consumptionEntity) == InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            return true;
        }
        LOGGER.error(format);
        throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(Integer num, String str) {
        Preconditions.checkNotNull(this.consumptionMapper.selectByPrimaryKey(num), "consumption not exist with id: " + num);
        this.consumptionMapper.deleteByPrimaryKey(num);
        this.consumptionPulsarMapper.deleteByConsumptionId(num);
        return true;
    }

    @Override // org.apache.inlong.manager.service.core.ConsumptionService
    public void saveSortConsumption(InlongGroupInfo inlongGroupInfo, String str, String str2) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        if (this.consumptionMapper.selectConsumptionExists(inlongGroupId, str, str2) != null) {
            log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create", new Object[]{inlongGroupId, str, str2});
            return;
        }
        log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", new Object[]{inlongGroupId, str, str2});
        String mqType = inlongGroupInfo.getMqType();
        ConsumptionEntity consumptionEntity = new ConsumptionEntity();
        consumptionEntity.setInlongGroupId(inlongGroupId);
        consumptionEntity.setMqType(mqType);
        consumptionEntity.setTopic(str);
        consumptionEntity.setConsumerGroup(str2);
        consumptionEntity.setInCharges(inlongGroupInfo.getInCharges());
        consumptionEntity.setFilterEnabled(0);
        consumptionEntity.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        String creator = inlongGroupInfo.getCreator();
        consumptionEntity.setCreator(creator);
        consumptionEntity.setModifier(creator);
        this.consumptionMapper.insert(consumptionEntity);
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            ConsumptionPulsarEntity consumptionPulsarEntity = new ConsumptionPulsarEntity();
            consumptionPulsarEntity.setConsumptionId(consumptionEntity.getId());
            consumptionPulsarEntity.setConsumerGroup(str2);
            consumptionPulsarEntity.setInlongGroupId(inlongGroupId);
            consumptionPulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
            this.consumptionPulsarMapper.insert(consumptionPulsarEntity);
        }
        log.debug("success save consumption, groupId={}, topic={}, consumer group={}", new Object[]{inlongGroupId, str, str2});
    }

    private ConsumptionEntity saveConsumption(ConsumptionInfo consumptionInfo, String str) {
        ConsumptionEntity consumptionEntity = (ConsumptionEntity) CommonBeanUtils.copyProperties(consumptionInfo, ConsumptionEntity::new);
        consumptionEntity.setStatus(Integer.valueOf(ConsumptionStatus.WAIT_ASSIGN.getStatus()));
        consumptionEntity.setCreator(str);
        consumptionEntity.setModifier(str);
        if (consumptionInfo.getId() == null) {
            this.consumptionMapper.insert(consumptionEntity);
        } else if (this.consumptionMapper.updateByPrimaryKey(consumptionEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("consumption information has already updated, id={}, curVersion={}", consumptionEntity.getId(), consumptionEntity.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        Preconditions.checkNotNull(consumptionEntity.getId(), "save consumption failed");
        return consumptionEntity;
    }

    private void setTopicInfo(ConsumptionInfo consumptionInfo) {
        String inlongGroupId = consumptionInfo.getInlongGroupId();
        InlongGroupTopicInfo topic = this.groupService.getTopic(inlongGroupId);
        Preconditions.checkNotNull(topic, "inlong group not exist: " + inlongGroupId);
        String mqType = topic.getMqType();
        if ("TUBEMQ".equals(mqType)) {
            String mqResource = topic.getMqResource();
            Preconditions.checkTrue(mqResource == null || mqResource.equals(consumptionInfo.getTopic()), "topic [" + consumptionInfo.getTopic() + "] not belong to inlong group " + inlongGroupId);
        } else if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            List streamTopics = topic.getStreamTopics();
            if (streamTopics != null && streamTopics.size() > 0) {
                HashSet hashSet = new HashSet(Arrays.asList(consumptionInfo.getTopic().split(",")));
                streamTopics.forEach(inlongStreamBriefInfo -> {
                    hashSet.remove(inlongStreamBriefInfo.getMqResource());
                });
                Preconditions.checkEmpty(hashSet, "topic [" + hashSet + "] not belong to inlong group " + inlongGroupId);
            }
            InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
            if (null != selectByGroupId) {
                PulsarClusterInfo one = this.clusterService.getOne(selectByGroupId.getInlongClusterTag(), null, "PULSAR");
                consumptionInfo.setTopic(String.format("persistent://%s/%s/%s", StringUtils.isEmpty(one.getTenant()) ? "public" : one.getTenant(), selectByGroupId.getMqResource(), consumptionInfo.getTopic()));
            }
        }
        consumptionInfo.setMqType(mqType);
    }
}
