package org.apache.inlong.manager.service.consume;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/consume/ConsumePulsarOperator.class */
public class ConsumePulsarOperator extends AbstractConsumeOperator {
    private static final Integer DLQ_RLQ_ENABLE = 1;
    private static final Integer DLQ__RLQ_DISABLE = 0;
    private static final String PREFIX_DLQ = "dlq";
    private static final String PREFIX_RLQ = "rlq";

    @Autowired
    private InlongGroupService groupService;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongClusterService clusterService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private ObjectMapper objectMapper;

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public Boolean accept(String str) {
        return Boolean.valueOf(getMQType().equals(str) || "TDMQ_PULSAR".equals(str));
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public String getMQType() {
        return "PULSAR";
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public void checkTopicInfo(InlongConsumeRequest inlongConsumeRequest) {
        String inlongGroupId = inlongConsumeRequest.getInlongGroupId();
        InlongPulsarTopicInfo topic = this.groupService.getTopic(inlongGroupId);
        Preconditions.expectNotNull(topic, "inlong group not exist for groupId=" + inlongGroupId);
        InlongPulsarTopicInfo inlongPulsarTopicInfo = topic;
        String topic2 = inlongConsumeRequest.getTopic();
        if (topic2.startsWith("persistent")) {
            topic2 = topic2.substring(topic2.lastIndexOf("/") + 1);
            inlongConsumeRequest.setTopic(topic2);
        }
        Preconditions.expectTrue(inlongPulsarTopicInfo.getTopics().contains(topic2), "Pulsar topic not exist for " + topic2);
    }

    @Override // org.apache.inlong.manager.service.consume.InlongConsumeOperator
    public InlongConsumeInfo getFromEntity(InlongConsumeEntity inlongConsumeEntity) {
        Preconditions.expectNotNull(inlongConsumeEntity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
        ConsumePulsarInfo consumePulsarInfo = new ConsumePulsarInfo();
        CommonBeanUtils.copyProperties(inlongConsumeEntity, consumePulsarInfo);
        if (StringUtils.isNotBlank(inlongConsumeEntity.getExtParams())) {
            CommonBeanUtils.copyProperties(ConsumePulsarDTO.getFromJson(inlongConsumeEntity.getExtParams()), consumePulsarInfo);
        }
        String inlongGroupId = inlongConsumeEntity.getInlongGroupId();
        InlongPulsarInfo inlongPulsarInfo = this.groupService.get(inlongGroupId);
        List<ClusterInfo> listByTagAndType = this.clusterService.listByTagAndType(inlongPulsarInfo.getInlongClusterTag(), "PULSAR");
        Preconditions.expectNotEmpty(listByTagAndType, "pulsar cluster not exist for groupId=" + inlongGroupId);
        consumePulsarInfo.setClusterInfos(listByTagAndType);
        String pulsarTenant = inlongPulsarInfo.getPulsarTenant();
        if (StringUtils.isBlank(pulsarTenant)) {
            pulsarTenant = listByTagAndType.get(0).getPulsarTenant();
        }
        consumePulsarInfo.setTopic(getFullPulsarTopic(inlongPulsarInfo, pulsarTenant, inlongConsumeEntity.getTopic()));
        return consumePulsarInfo;
    }

    @Override // org.apache.inlong.manager.service.consume.AbstractConsumeOperator
    protected void setTargetEntity(InlongConsumeRequest inlongConsumeRequest, InlongConsumeEntity inlongConsumeEntity) {
        ConsumePulsarRequest consumePulsarRequest = (ConsumePulsarRequest) inlongConsumeRequest;
        boolean equals = DLQ_RLQ_ENABLE.equals(consumePulsarRequest.getIsDlq());
        boolean equals2 = DLQ_RLQ_ENABLE.equals(consumePulsarRequest.getIsRlq());
        if (equals2 && !equals) {
            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String inlongGroupId = inlongConsumeEntity.getInlongGroupId();
        if (equals) {
            Preconditions.expectTrue(!this.streamService.exist(inlongGroupId, new StringBuilder().append("dlq_").append(consumePulsarRequest.getDeadLetterTopic()).toString()).booleanValue(), ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage());
        } else {
            consumePulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
            consumePulsarRequest.setDeadLetterTopic((String) null);
        }
        if (equals2) {
            Preconditions.expectTrue(!this.streamService.exist(inlongGroupId, new StringBuilder().append("rlq_").append(consumePulsarRequest.getRetryLetterTopic()).toString()).booleanValue(), ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage());
        } else {
            consumePulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
            consumePulsarRequest.setRetryLetterTopic((String) null);
        }
        try {
            inlongConsumeEntity.setExtParams(this.objectMapper.writeValueAsString(ConsumePulsarDTO.getFromRequest(consumePulsarRequest, inlongConsumeEntity.getExtParams())));
        } catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
        }
    }

    private String getFullPulsarTopic(InlongGroupInfo inlongGroupInfo, String str, String str2) {
        if (StringUtils.isEmpty(str)) {
            str = "public";
        }
        return String.format("persistent://%s/%s/%s", str, inlongGroupInfo.getMqResource(), str2);
    }
}
