/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessTopicVO;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionMqExtBase;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.BusinessEntity;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowResult;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.model.view.CountByKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

@Service
public class ConsumptionServiceImpl
implements ConsumptionService {
    private static final Logger log = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
    @Autowired
    private ClusterBean clusterBean;
    @Autowired
    private BusinessEntityMapper businessMapper;
    @Autowired
    private ConsumptionEntityMapper consumptionMapper;
    @Autowired
    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
    @Autowired
    private WorkflowService workflowService;
    @Autowired
    private BusinessService businessService;
    @Autowired
    private DataStreamService streamService;

    @Override
    public ConsumptionSummary getSummary(ConsumptionQuery query) {
        Map<String, Integer> countByState = this.consumptionMapper.countByStatus(query).stream().collect(Collectors.toMap(CountByKey::getKey, CountByKey::getValue));
        return ConsumptionSummary.builder().totalCount(Integer.valueOf(countByState.values().stream().mapToInt(c -> c).sum())).waitingAssignCount(countByState.getOrDefault(ConsumptionStatus.WAITING_ASSIGN.getStatus() + "", 0)).waitingApproveCount(countByState.getOrDefault(ConsumptionStatus.WAITING_APPROVE.getStatus() + "", 0)).rejectedCount(countByState.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build();
    }

    @Override
    public PageInfo<ConsumptionListVo> list(ConsumptionQuery query) {
        PageHelper.startPage((int)query.getPageNum(), (int)query.getPageSize());
        Page pageResult = (Page)this.consumptionMapper.listByQuery(query);
        PageInfo pageInfo = pageResult.toPageInfo(entity -> (ConsumptionListVo)CommonBeanUtils.copyProperties((Object)entity, ConsumptionListVo::new));
        pageInfo.setTotal(pageResult.getTotal());
        return pageInfo;
    }

    @Override
    public ConsumptionInfo get(Integer id) {
        Preconditions.checkNotNull((Object)id, (String)"consumption id cannot be null");
        ConsumptionEntity entity = this.consumptionMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)entity, (String)("consumption not exist with id:" + id));
        ConsumptionInfo info = (ConsumptionInfo)CommonBeanUtils.copyProperties((Object)entity, ConsumptionInfo::new);
        if ("PULSAR".equalsIgnoreCase(info.getMiddlewareType())) {
            ConsumptionPulsarEntity pulsarEntity = this.consumptionPulsarMapper.selectByConsumptionId(info.getId());
            Preconditions.checkNotNull((Object)pulsarEntity, (String)"Pulsar consumption cannot be empty, as the middleware is Pulsar");
            ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo)CommonBeanUtils.copyProperties((Object)pulsarEntity, ConsumptionPulsarInfo::new);
            info.setMqExtInfo((ConsumptionMqExtBase)pulsarInfo);
            info.setTopic(this.getFullPulsarTopic(info.getInlongGroupId(), info.getTopic()));
        }
        return info;
    }

    @Override
    public boolean isConsumerGroupIdExists(String consumerGroup, Integer excludeSelfId) {
        ConsumptionQuery consumptionQuery = new ConsumptionQuery();
        consumptionQuery.setConsumerGroupId(consumerGroup);
        List result = this.consumptionMapper.listByQuery(consumptionQuery);
        if (excludeSelfId != null) {
            result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId())).collect(Collectors.toList());
        }
        return !CollectionUtils.isEmpty((Collection)result);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(ConsumptionInfo info, String operator) {
        this.fullConsumptionInfo(info);
        Date now = new Date();
        ConsumptionEntity entity = this.saveConsumption(info, operator, now);
        if ("PULSAR".equals(entity.getMiddlewareType())) {
            this.savePulsarInfo(info.getMqExtInfo(), entity);
        }
        return entity.getId();
    }

    private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
        Boolean exist;
        boolean rlqEnable;
        Preconditions.checkNotNull((Object)mqExtBase, (String)"Pulsar info cannot be empty, as the middleware is Pulsar");
        ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo)mqExtBase;
        boolean dlqEnable = pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1;
        boolean bl = rlqEnable = pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1;
        if (rlqEnable && !dlqEnable) {
            throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String groupId = entity.getInlongGroupId();
        if (dlqEnable) {
            String dlqTopic = "dlq_" + pulsarInfo.getDeadLetterTopic();
            exist = this.streamService.exist(groupId, dlqTopic);
            if (exist.booleanValue()) {
                throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
            }
        } else {
            pulsarInfo.setIsDlq(Integer.valueOf(0));
            pulsarInfo.setDeadLetterTopic(null);
        }
        if (rlqEnable) {
            String rlqTopic = "rlq_" + pulsarInfo.getRetryLetterTopic();
            exist = this.streamService.exist(groupId, rlqTopic);
            if (exist.booleanValue()) {
                throw new BusinessException(BizErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
            }
        } else {
            pulsarInfo.setIsRlq(Integer.valueOf(0));
            pulsarInfo.setRetryLetterTopic(null);
        }
        ConsumptionPulsarEntity pulsar = (ConsumptionPulsarEntity)CommonBeanUtils.copyProperties((Object)pulsarInfo, ConsumptionPulsarEntity::new);
        Integer consumptionId = entity.getId();
        pulsar.setConsumptionId(consumptionId);
        pulsar.setInlongGroupId(groupId);
        pulsar.setConsumerGroupId(entity.getConsumerGroupId());
        pulsar.setIsDeleted(Integer.valueOf(0));
        ConsumptionPulsarEntity exists = this.consumptionPulsarMapper.selectByConsumptionId(consumptionId);
        if (exists == null) {
            this.consumptionPulsarMapper.insert(pulsar);
        } else {
            pulsar.setId(exists.getId());
            this.consumptionPulsarMapper.updateByPrimaryKey(pulsar);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(ConsumptionInfo info, String operator) {
        Preconditions.checkNotNull((Object)info, (String)"consumption info cannot be null");
        Integer consumptionId = info.getId();
        Preconditions.checkNotNull((Object)consumptionId, (String)"consumption id cannot be null");
        ConsumptionEntity exists = this.consumptionMapper.selectByPrimaryKey(consumptionId);
        Preconditions.checkNotNull((Object)exists, (String)("consumption not exist with id " + consumptionId));
        Preconditions.checkTrue((boolean)exists.getInCharges().contains(operator), (String)("operator" + operator + " has no privilege for the consumption"));
        ConsumptionEntity entity = new ConsumptionEntity();
        Date now = new Date();
        CommonBeanUtils.copyProperties((Object)info, (Object)entity, (boolean)true);
        entity.setModifier(operator);
        entity.setModifyTime(now);
        if ("PULSAR".equalsIgnoreCase(info.getMiddlewareType())) {
            boolean rlqEnable;
            ConsumptionPulsarEntity pulsarEntity = this.consumptionPulsarMapper.selectByConsumptionId(consumptionId);
            Preconditions.checkNotNull((Object)pulsarEntity, (String)"Pulsar consumption cannot be null");
            pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
            ConsumptionPulsarInfo update = (ConsumptionPulsarInfo)info.getMqExtInfo();
            boolean dlqEnable = update.getIsDlq() != null && update.getIsDlq() == 1;
            boolean bl = rlqEnable = update.getIsRlq() != null && update.getIsRlq() == 1;
            if (rlqEnable && !dlqEnable) {
                throw new BusinessException(BizErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
            }
            if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus().intValue()) {
                String groupId = info.getInlongGroupId();
                String dlqNameOld = pulsarEntity.getDeadLetterTopic();
                String dlqNameNew = update.getDeadLetterTopic();
                if (!dlqEnable) {
                    pulsarEntity.setIsDlq(Integer.valueOf(0));
                    pulsarEntity.setDeadLetterTopic(null);
                    this.streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
                } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
                    pulsarEntity.setIsDlq(Integer.valueOf(1));
                    String topic = "dlq_" + dlqNameNew;
                    topic = topic.toLowerCase(Locale.ROOT);
                    pulsarEntity.setDeadLetterTopic(topic);
                    this.streamService.insertDlqOrRlq(groupId, topic, operator);
                }
                String rlqNameOld = pulsarEntity.getRetryLetterTopic();
                String rlqNameNew = update.getRetryLetterTopic();
                if (!rlqEnable) {
                    pulsarEntity.setIsRlq(Integer.valueOf(0));
                    pulsarEntity.setRetryLetterTopic(null);
                    this.streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
                } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
                    pulsarEntity.setIsRlq(Integer.valueOf(1));
                    String topic = "rlq_" + rlqNameNew;
                    topic = topic.toLowerCase(Locale.ROOT);
                    pulsarEntity.setRetryLetterTopic(topic);
                    this.streamService.insertDlqOrRlq(groupId, topic, operator);
                }
            }
            this.consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
        }
        this.consumptionMapper.updateByPrimaryKeySelective(entity);
        return true;
    }

    private String getFullPulsarTopic(String groupId, String topic) {
        BusinessEntity businessEntity = this.businessMapper.selectByIdentifier(groupId);
        String tenant = this.clusterBean.getDefaultTenant();
        String namespace = businessEntity.getMqResourceObj();
        return "persistent://" + tenant + "/" + namespace + "/" + topic;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, String operator) {
        ConsumptionEntity consumptionEntity = this.consumptionMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)consumptionEntity, (String)("consumption not exist with id: " + id));
        this.consumptionMapper.deleteByPrimaryKey(id);
        this.consumptionPulsarMapper.deleteByConsumptionId(id);
        return true;
    }

    @Override
    public WorkflowResult startProcess(Integer id, String operation) {
        ConsumptionInfo consumptionInfo = this.get(id);
        Preconditions.checkTrue((boolean)ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains(ConsumptionStatus.fromStatus((int)consumptionInfo.getStatus())), (String)"current status not allow start workflow");
        ConsumptionEntity updateConsumptionEntity = new ConsumptionEntity();
        updateConsumptionEntity.setId(consumptionInfo.getId());
        updateConsumptionEntity.setModifyTime(new Date());
        updateConsumptionEntity.setStatus(Integer.valueOf(ConsumptionStatus.WAITING_APPROVE.getStatus()));
        int success = this.consumptionMapper.updateByPrimaryKeySelective(updateConsumptionEntity);
        Preconditions.checkTrue((success == 1 ? 1 : 0) != 0, (String)"update consumption failed");
        return this.workflowService.start(ProcessName.NEW_CONSUMPTION_WORKFLOW, operation, this.genNewConsumptionWorkflowForm(consumptionInfo));
    }

    @Override
    public void saveSortConsumption(BusinessInfo bizInfo, String topic, String consumerGroup) {
        String groupId = bizInfo.getInlongGroupId();
        ConsumptionEntity exists = this.consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
        if (exists != null) {
            log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create", new Object[]{groupId, topic, consumerGroup});
            return;
        }
        log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", new Object[]{groupId, topic, consumerGroup});
        String middlewareType = bizInfo.getMiddlewareType();
        ConsumptionEntity entity = new ConsumptionEntity();
        entity.setInlongGroupId(groupId);
        entity.setMiddlewareType(middlewareType);
        entity.setTopic(topic);
        entity.setConsumerGroupId(consumerGroup);
        entity.setConsumerGroupName(consumerGroup);
        entity.setInCharges(bizInfo.getInCharges());
        entity.setFilterEnabled(Integer.valueOf(0));
        entity.setStatus(Integer.valueOf(ConsumptionStatus.APPROVED.getStatus()));
        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
        entity.setCreator(bizInfo.getCreator());
        entity.setCreateTime(new Date());
        this.consumptionMapper.insert(entity);
        if ("PULSAR".equals(middlewareType)) {
            ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
            pulsarEntity.setConsumptionId(entity.getId());
            pulsarEntity.setConsumerGroupId(consumerGroup);
            pulsarEntity.setInlongGroupId(groupId);
            pulsarEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
            this.consumptionPulsarMapper.insert(pulsarEntity);
        }
        log.debug("success save consumption, groupId={}, topic={}, consumer group={}", new Object[]{groupId, topic, consumerGroup});
    }

    private NewConsumptionWorkflowForm genNewConsumptionWorkflowForm(ConsumptionInfo consumptionInfo) {
        NewConsumptionWorkflowForm form = new NewConsumptionWorkflowForm();
        Integer id = consumptionInfo.getId();
        if ("PULSAR".equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
            ConsumptionPulsarEntity consumptionPulsarEntity = this.consumptionPulsarMapper.selectByConsumptionId(id);
            ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo)CommonBeanUtils.copyProperties((Object)consumptionPulsarEntity, ConsumptionPulsarInfo::new);
            consumptionInfo.setMqExtInfo((ConsumptionMqExtBase)pulsarInfo);
        }
        form.setConsumptionInfo(consumptionInfo);
        return form;
    }

    private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator, Date now) {
        ConsumptionEntity entity = (ConsumptionEntity)CommonBeanUtils.copyProperties((Object)info, ConsumptionEntity::new);
        entity.setStatus(Integer.valueOf(ConsumptionStatus.WAITING_ASSIGN.getStatus()));
        entity.setIsDeleted(Integer.valueOf(0));
        entity.setCreator(operator);
        entity.setModifier(operator);
        entity.setCreateTime(now);
        entity.setModifyTime(now);
        if (info.getId() != null) {
            this.consumptionMapper.updateByPrimaryKey(entity);
        } else {
            this.consumptionMapper.insert(entity);
        }
        Preconditions.checkNotNull((Object)entity.getId(), (String)"save consumption failed");
        return entity;
    }

    private void fullConsumptionInfo(ConsumptionInfo info) {
        List dsTopicList;
        Preconditions.checkNotNull((Object)info, (String)"consumption info cannot be null");
        info.setConsumerGroupId(info.getConsumerGroupName());
        Preconditions.checkFalse((boolean)this.isConsumerGroupIdExists(info.getConsumerGroupId(), info.getId()), (String)("consumer group " + info.getConsumerGroupId() + " already exist"));
        if (info.getId() != null) {
            ConsumptionEntity consumptionEntity = this.consumptionMapper.selectByPrimaryKey(info.getId());
            Preconditions.checkNotNull((Object)consumptionEntity, (String)("consumption not exist with id: " + info.getId()));
            ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus((int)consumptionEntity.getStatus());
            Preconditions.checkTrue((boolean)ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus), (String)("consumption not allow update when status is " + consumptionStatus.name()));
        }
        Preconditions.checkNotNull((Object)info.getTopic(), (String)"consumption topic cannot be empty");
        String groupId = info.getInlongGroupId();
        BusinessTopicVO topicVO = this.businessService.getTopic(groupId);
        Preconditions.checkNotNull((Object)topicVO, (String)("business not exist: " + groupId));
        if ("TUBE".equalsIgnoreCase(topicVO.getMiddlewareType())) {
            String bizTopic = topicVO.getMqResourceObj();
            Preconditions.checkTrue((bizTopic == null || bizTopic.equals(info.getTopic()) ? 1 : 0) != 0, (String)("topic [" + info.getTopic() + "] not belong to business " + groupId));
        } else if ("PULSAR".equalsIgnoreCase(topicVO.getMiddlewareType()) && (dsTopicList = topicVO.getDsTopicList()) != null && dsTopicList.size() > 0) {
            HashSet<String> topicSet = new HashSet<String>(Arrays.asList(info.getTopic().split(",")));
            dsTopicList.forEach(ds -> topicSet.remove(ds.getMqResourceObj()));
            Preconditions.checkEmpty(topicSet, (String)("topic [" + topicSet + "] not belong to business " + groupId));
        }
    }
}

