package org.apache.inlong.manager.service.resource.sink.pulsar;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.class */
public class PulsarResourceOperator implements SinkResourceOperator {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarResourceOperator.class);

    @Autowired
    private DataNodeOperateHelper dataNodeHelper;

    @Autowired
    private PulsarOperator pulsarOperator;

    @Autowired
    private StreamSinkService sinkService;

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public Boolean accept(String str) {
        return Boolean.valueOf("PULSAR".equals(str));
    }

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public void createSinkResource(SinkInfo sinkInfo) {
        LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
            LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
        } else {
            createTopic(sinkInfo);
        }
    }

    private void createTopic(SinkInfo sinkInfo) {
        PulsarSinkDTO fromJson = PulsarSinkDTO.getFromJson(sinkInfo.getExtParams());
        PulsarDataNodeDTO pulsarDataNodeInfo = getPulsarDataNodeInfo(sinkInfo);
        try {
            PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl()).token(pulsarDataNodeInfo.getToken()).build());
            this.pulsarOperator.createTenant(pulsarAdmin, fromJson.getPulsarTenant());
            this.pulsarOperator.createNamespace(pulsarAdmin, new InlongPulsarInfo(), fromJson.getPulsarTenant(), fromJson.getNamespace());
            this.pulsarOperator.createTopic(pulsarAdmin, PulsarTopicInfo.builder().pulsarTenant(fromJson.getPulsarTenant()).namespace(fromJson.getNamespace()).topicName(fromJson.getTopic()).numPartitions(fromJson.getPartitionNum()).queueModule(fromJson.getPartitionNum().intValue() > 0 ? "PARALLEL" : "SERIAL").build());
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode().intValue(), "success to create Pulsar resource");
            LOG.info("success to create Pulsar resource for sinkInfo={}", sinkInfo);
        } catch (PulsarClientException | PulsarAdminException e) {
            LOG.error("init pulsar admin error", e);
            throw new BusinessException();
        }
    }

    private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) {
        String dataNodeName = sinkInfo.getDataNodeName();
        Preconditions.expectNotBlank(dataNodeName, ErrorCodeEnum.INVALID_PARAMETER, "Pulsar admin url not specified and data node is empty");
        PulsarDataNodeInfo dataNodeInfo = this.dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
        PulsarDataNodeDTO pulsarDataNodeDTO = new PulsarDataNodeDTO();
        CommonBeanUtils.copyProperties(dataNodeInfo, pulsarDataNodeDTO, true);
        return pulsarDataNodeDTO;
    }
}
