/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

@Service
public class ConsumptionServiceImpl
implements ConsumptionService {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
    private static final String PREFIX_DLQ = "dlq";
    private static final String PREFIX_RLQ = "rlq";
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private ConsumptionEntityMapper consumptionMapper;
    @Autowired
    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private InlongClusterService clusterService;

    @Override
    public ConsumptionSummary getSummary(ConsumptionQuery query) {
        HashMap countMap = new HashMap();
        this.consumptionMapper.countByQuery(query).forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
        return ConsumptionSummary.builder().totalCount(Integer.valueOf(countMap.values().stream().mapToInt(c -> c).sum())).waitingAssignCount(countMap.getOrDefault(ConsumptionStatus.WAIT_ASSIGN.getStatus() + "", 0)).waitingApproveCount(countMap.getOrDefault(ConsumptionStatus.WAIT_APPROVE.getStatus() + "", 0)).rejectedCount(countMap.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build();
    }

    @Override
    public PageResult<ConsumptionListVo> list(ConsumptionQuery query) {
        PageHelper.startPage((int)query.getPageNum(), (int)query.getPageSize());
        query.setIsAdminRole(Boolean.valueOf(LoginUserUtils.getLoginUser().getRoles().contains("ADMIN")));
        Page pageResult = (Page)this.consumptionMapper.listByQuery(query);
        List consumptionListVos = CommonBeanUtils.copyListProperties((List)pageResult.getResult(), ConsumptionListVo::new);
        return new PageResult(consumptionListVos, Long.valueOf(pageResult.getTotal()), Integer.valueOf(pageResult.getPageNum()), Integer.valueOf(pageResult.getPageSize()));
    }

    @Override
    public ConsumptionInfo get(Integer id) {
        Preconditions.checkNotNull((Object)id, (String)"consumption id cannot be null");
        ConsumptionEntity entity = this.consumptionMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)entity, (String)("consumption not exist with id:" + id));
        ConsumptionInfo info = (ConsumptionInfo)CommonBeanUtils.copyProperties((Object)entity, ConsumptionInfo::new);
        String mqType = info.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            ConsumptionPulsarEntity pulsarEntity = this.consumptionPulsarMapper.selectByConsumptionId(info.getId());
            Preconditions.checkNotNull((Object)pulsarEntity, (String)"Pulsar consumption cannot be empty, as the middleware is Pulsar");
            ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo)CommonBeanUtils.copyProperties((Object)pulsarEntity, ConsumptionPulsarInfo::new);
            info.setMqExtInfo((ConsumptionMqExtBase)pulsarInfo);
        }
        return info;
    }

    @Override
    public boolean isConsumerGroupExists(String consumerGroup, Integer excludeSelfId) {
        ConsumptionQuery consumptionQuery = new ConsumptionQuery();
        consumptionQuery.setConsumerGroup(consumerGroup);
        consumptionQuery.setIsAdminRole(Boolean.valueOf(true));
        List result = this.consumptionMapper.listByQuery(consumptionQuery);
        if (excludeSelfId != null) {
            result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId())).collect(Collectors.toList());
        }
        return !CollectionUtils.isEmpty((Collection)result);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(ConsumptionInfo info, String operator) {
        log.debug("begin to save consumption info={}", (Object)info);
        Preconditions.checkNotNull((Object)info, (String)"consumption info cannot be null");
        Preconditions.checkNotNull((Object)info.getTopic(), (String)"consumption topic cannot be empty");
        if (this.isConsumerGroupExists(info.getConsumerGroup(), info.getId())) {
            throw new BusinessException(String.format("consumer group %s already exist", info.getConsumerGroup()));
        }
        if (info.getId() != null) {
            ConsumptionEntity consumptionEntity = this.consumptionMapper.selectByPrimaryKey(info.getId());
            Preconditions.checkNotNull((Object)consumptionEntity, (String)("consumption not exist with id: " + info.getId()));
            ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus((int)consumptionEntity.getStatus());
            Preconditions.checkTrue((boolean)ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus), (String)("consumption not allow update when status is " + consumptionStatus.name()));
        }
        this.setTopicInfo(info);
        ConsumptionEntity entity = this.saveConsumption(info, operator);
        String mqType = entity.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            this.savePulsarInfo(info.getMqExtInfo(), entity);
        }
        log.info("success to save consumption info by user={}", (Object)operator);
        return entity.getId();
    }

    private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
        Boolean exist;
        boolean rlqEnable;
        Preconditions.checkNotNull((Object)mqExtBase, (String)"Pulsar info cannot be empty, as the middleware is Pulsar");
        ConsumptionPulsarInfo pulsarInfo = mqExtBase instanceof ConsumptionPulsarInfo ? (ConsumptionPulsarInfo)mqExtBase : new ConsumptionPulsarInfo();
        boolean dlqEnable = pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1;
        boolean bl = rlqEnable = pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1;
        if (rlqEnable && !dlqEnable) {
            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String groupId = entity.getInlongGroupId();
        if (dlqEnable) {
            String dlqTopic = "dlq_" + pulsarInfo.getDeadLetterTopic();
            exist = this.streamService.exist(groupId, dlqTopic);
            if (exist.booleanValue()) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
            }
        } else {
            pulsarInfo.setIsDlq(Integer.valueOf(0));
            pulsarInfo.setDeadLetterTopic(null);
        }
        if (rlqEnable) {
            String rlqTopic = "rlq_" + pulsarInfo.getRetryLetterTopic();
            exist = this.streamService.exist(groupId, rlqTopic);
            if (exist.booleanValue()) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
            }
        } else {
            pulsarInfo.setIsRlq(Integer.valueOf(0));
            pulsarInfo.setRetryLetterTopic(null);
        }
        ConsumptionPulsarEntity pulsar = (ConsumptionPulsarEntity)CommonBeanUtils.copyProperties((Object)pulsarInfo, ConsumptionPulsarEntity::new);
        Integer consumptionId = entity.getId();
        pulsar.setConsumptionId(consumptionId);
        pulsar.setInlongGroupId(groupId);
        pulsar.setConsumerGroup(entity.getConsumerGroup());
        pulsar.setIsDeleted(Integer.valueOf(0));
        ConsumptionPulsarEntity exists = this.consumptionPulsarMapper.selectByConsumptionId(consumptionId);
        if (exists == null) {
            this.consumptionPulsarMapper.insert(pulsar);
        } else {
            pulsar.setId(exists.getId());
            this.consumptionPulsarMapper.updateByPrimaryKey(pulsar);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(ConsumptionInfo info, String operator) {
        int rowCount;
        Preconditions.checkNotNull((Object)info, (String)"consumption info cannot be null");
        Integer consumptionId = info.getId();
        Preconditions.checkNotNull((Object)consumptionId, (String)"consumption id cannot be null");
        ConsumptionEntity exists = this.consumptionMapper.selectByPrimaryKey(consumptionId);
        Preconditions.checkNotNull((Object)exists, (String)("consumption not exist with id " + consumptionId));
        Preconditions.checkTrue((boolean)exists.getInCharges().contains(operator), (String)("operator" + operator + " has no privilege for the consumption"));
        String errMsg = String.format("consumption information has already updated, id=%s, curVersion=%s", exists.getId(), info.getVersion());
        if (!Objects.equals(exists.getVersion(), info.getVersion())) {
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        ConsumptionEntity entity = new ConsumptionEntity();
        Date now = new Date();
        CommonBeanUtils.copyProperties((Object)info, (Object)entity, (boolean)true);
        entity.setModifier(operator);
        entity.setModifyTime(now);
        String mqType = info.getMqType();
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            boolean rlqEnable;
            ConsumptionPulsarEntity pulsarEntity = this.consumptionPulsarMapper.selectByConsumptionId(consumptionId);
            Preconditions.checkNotNull((Object)pulsarEntity, (String)"Pulsar consumption cannot be null");
            pulsarEntity.setConsumerGroup(info.getConsumerGroup());
            ConsumptionPulsarInfo update = (ConsumptionPulsarInfo)info.getMqExtInfo();
            boolean dlqEnable = update.getIsDlq() != null && update.getIsDlq() == 1;
            boolean bl = rlqEnable = update.getIsRlq() != null && update.getIsRlq() == 1;
            if (rlqEnable && !dlqEnable) {
                throw new BusinessException(ErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
            }
            if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus().intValue()) {
                String groupId = info.getInlongGroupId();
                String dlqNameOld = pulsarEntity.getDeadLetterTopic();
                String dlqNameNew = update.getDeadLetterTopic();
                if (!dlqEnable) {
                    pulsarEntity.setIsDlq(Integer.valueOf(0));
                    pulsarEntity.setDeadLetterTopic(null);
                    this.streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
                } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
                    pulsarEntity.setIsDlq(Integer.valueOf(1));
                    String topic = "dlq_" + dlqNameNew;
                    topic = topic.toLowerCase(Locale.ROOT);
                    pulsarEntity.setDeadLetterTopic(topic);
                    this.streamService.insertDlqOrRlq(groupId, topic, operator);
                }
                String rlqNameOld = pulsarEntity.getRetryLetterTopic();
                String rlqNameNew = update.getRetryLetterTopic();
                if (!rlqEnable) {
                    pulsarEntity.setIsRlq(Integer.valueOf(0));
                    pulsarEntity.setRetryLetterTopic(null);
                    this.streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
                } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
                    pulsarEntity.setIsRlq(Integer.valueOf(1));
                    String topic = "rlq_" + rlqNameNew;
                    topic = topic.toLowerCase(Locale.ROOT);
                    pulsarEntity.setRetryLetterTopic(topic);
                    this.streamService.insertDlqOrRlq(groupId, topic, operator);
                }
            }
            this.consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
        }
        if ((rowCount = this.consumptionMapper.updateByPrimaryKeySelective(entity)) != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, String operator) {
        ConsumptionEntity consumptionEntity = this.consumptionMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)consumptionEntity, (String)("consumption not exist with id: " + id));
        this.consumptionMapper.deleteByPrimaryKey(id);
        this.consumptionPulsarMapper.deleteByConsumptionId(id);
        return true;
    }

    @Override
    public void saveSortConsumption(InlongGroupInfo groupInfo, String topic, String consumerGroup) {
        String groupId = groupInfo.getInlongGroupId();
        ConsumptionEntity exists = this.consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
        if (exists != null) {
            log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create", new Object[]{groupId, topic, consumerGroup});
            return;
        }
        log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", new Object[]{groupId, topic, consumerGroup});
        String mqType = groupInfo.getMqType();
        ConsumptionEntity entity = new ConsumptionEntity();
        entity.setInlongGroupId(groupId);
        entity.setMqType(mqType);
        entity.setTopic(topic);
        entity.setConsumerGroup(consumerGroup);
        entity.setInCharges(groupInfo.getInCharges());
        entity.setFilterEnabled(Integer.valueOf(0));
        entity.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        String operator = groupInfo.getCreator();
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.consumptionMapper.insert(entity);
        if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
            pulsarEntity.setConsumptionId(entity.getId());
            pulsarEntity.setConsumerGroup(consumerGroup);
            pulsarEntity.setInlongGroupId(groupId);
            pulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
            this.consumptionPulsarMapper.insert(pulsarEntity);
        }
        log.debug("success save consumption, groupId={}, topic={}, consumer group={}", new Object[]{groupId, topic, consumerGroup});
    }

    private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
        ConsumptionEntity entity = (ConsumptionEntity)CommonBeanUtils.copyProperties((Object)info, ConsumptionEntity::new);
        entity.setStatus(Integer.valueOf(ConsumptionStatus.WAIT_ASSIGN.getStatus()));
        entity.setCreator(operator);
        entity.setModifier(operator);
        if (info.getId() != null) {
            int rowCount = this.consumptionMapper.updateByPrimaryKey(entity);
            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                LOGGER.error("consumption information has already updated, id={}, curVersion={}", (Object)entity.getId(), (Object)entity.getVersion());
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        } else {
            this.consumptionMapper.insert(entity);
        }
        Preconditions.checkNotNull((Object)entity.getId(), (String)"save consumption failed");
        return entity;
    }

    private void setTopicInfo(ConsumptionInfo info) {
        String groupId = info.getInlongGroupId();
        InlongGroupTopicInfo topicVO = this.groupService.getTopic(groupId);
        Preconditions.checkNotNull((Object)topicVO, (String)("inlong group not exist: " + groupId));
        String mqType = topicVO.getMqType();
        if ("TUBEMQ".equals(mqType)) {
            String mqResource = topicVO.getMqResource();
            Preconditions.checkTrue((mqResource == null || mqResource.equals(info.getTopic()) ? 1 : 0) != 0, (String)("topic [" + info.getTopic() + "] not belong to inlong group " + groupId));
        } else if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
            InlongGroupEntity inlongGroupEntity;
            List streamTopics = topicVO.getStreamTopics();
            if (streamTopics != null && streamTopics.size() > 0) {
                HashSet<String> topicSet = new HashSet<String>(Arrays.asList(info.getTopic().split(",")));
                streamTopics.forEach(stream -> topicSet.remove(stream.getMqResource()));
                Preconditions.checkEmpty(topicSet, (String)("topic [" + topicSet + "] not belong to inlong group " + groupId));
            }
            if (null != (inlongGroupEntity = this.groupMapper.selectByGroupId(groupId))) {
                PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)this.clusterService.getOne(inlongGroupEntity.getInlongClusterTag(), null, "PULSAR");
                String tenant = StringUtils.isEmpty((CharSequence)pulsarCluster.getTenant()) ? "public" : pulsarCluster.getTenant();
                info.setTopic(String.format("persistent://%s/%s/%s", tenant, inlongGroupEntity.getMqResource(), info.getTopic()));
            }
        }
        info.setMqType(mqType);
    }
}

