/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.consume;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.AbstractConsumeOperator;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ConsumePulsarOperator
extends AbstractConsumeOperator {
    private static final Integer DLQ_RLQ_ENABLE = 1;
    private static final Integer DLQ__RLQ_DISABLE = 0;
    private static final String PREFIX_DLQ = "dlq";
    private static final String PREFIX_RLQ = "rlq";
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public Boolean accept(String mqType) {
        return this.getMQType().equals(mqType) || "TDMQ_PULSAR".equals(mqType);
    }

    @Override
    public String getMQType() {
        return "PULSAR";
    }

    @Override
    public void checkTopicInfo(InlongConsumeRequest request) {
        String groupId = request.getInlongGroupId();
        InlongGroupTopicInfo topicInfo = this.groupService.getTopic(groupId);
        Preconditions.checkNotNull((Object)topicInfo, (String)("inlong group not exist for groupId=" + groupId));
        InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo)topicInfo;
        String originTopic = request.getTopic();
        Preconditions.checkTrue((boolean)pulsarTopic.getTopics().contains(originTopic), (String)("Pulsar topic not exist for " + originTopic));
        request.setTopic(String.format("persistent://%s/%s/%s", pulsarTopic.getTenant(), pulsarTopic.getNamespace(), originTopic));
    }

    @Override
    public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
        ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
        CommonBeanUtils.copyProperties((Object)entity, (Object)consumeInfo);
        if (StringUtils.isNotBlank((CharSequence)entity.getExtParams())) {
            ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson((String)entity.getExtParams());
            CommonBeanUtils.copyProperties((Object)dto, (Object)consumeInfo);
        }
        return consumeInfo;
    }

    @Override
    protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
        ConsumePulsarRequest pulsarRequest = (ConsumePulsarRequest)request;
        boolean dlqEnable = DLQ_RLQ_ENABLE.equals(pulsarRequest.getIsDlq());
        boolean rlqEnable = DLQ_RLQ_ENABLE.equals(pulsarRequest.getIsRlq());
        if (rlqEnable && !dlqEnable) {
            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String groupId = targetEntity.getInlongGroupId();
        if (dlqEnable) {
            String dlqTopic = "dlq_" + pulsarRequest.getDeadLetterTopic();
            Preconditions.checkTrue((this.streamService.exist(groupId, dlqTopic) == false ? 1 : 0) != 0, (String)ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage());
        } else {
            pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
            pulsarRequest.setDeadLetterTopic(null);
        }
        if (rlqEnable) {
            String rlqTopic = "rlq_" + pulsarRequest.getRetryLetterTopic();
            Preconditions.checkTrue((this.streamService.exist(groupId, rlqTopic) == false ? 1 : 0) != 0, (String)ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage());
        } else {
            pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
            pulsarRequest.setRetryLetterTopic(null);
        }
        try {
            targetEntity.setExtParams(this.objectMapper.writeValueAsString((Object)ConsumePulsarDTO.getFromRequest((ConsumePulsarRequest)pulsarRequest)));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
        }
    }
}

