package org.apache.inlong.manager.service.resource.sink.cls;

import com.tencentcloudapi.cls.v20201016.ClsClient;
import com.tencentcloudapi.cls.v20201016.models.CreateIndexRequest;
import com.tencentcloudapi.cls.v20201016.models.CreateTopicRequest;
import com.tencentcloudapi.cls.v20201016.models.CreateTopicResponse;
import com.tencentcloudapi.cls.v20201016.models.FullTextInfo;
import com.tencentcloudapi.cls.v20201016.models.RuleInfo;
import com.tencentcloudapi.cls.v20201016.models.Tag;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.class */
public class ClsResourceOperator implements SinkResourceOperator {
    private static final Logger LOG = LoggerFactory.getLogger(ClsResourceOperator.class);

    @Autowired
    private DataNodeEntityMapper dataNodeEntityMapper;

    @Autowired
    private StreamSinkService sinkService;

    @Autowired
    private StreamSinkEntityMapper streamSinkEntityMapper;

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public Boolean accept(String str) {
        return Boolean.valueOf("CLS".equals(str));
    }

    @Override // org.apache.inlong.manager.service.resource.sink.SinkResourceOperator
    public void createSinkResource(SinkInfo sinkInfo) {
        LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
            LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
        } else {
            createTopicID(sinkInfo);
        }
    }

    private void createTopicID(SinkInfo sinkInfo) {
        ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
        ClsSinkDTO clsSinkDTO = (ClsSinkDTO) JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class);
        try {
            CreateTopicResponse CreateTopic = getClsClient(clsDataNode).CreateTopic(getCreateTopicRequest(clsDataNode, clsSinkDTO));
            LOG.info("create cls topic {} success ,topicId {}", clsSinkDTO.getTopicName(), CreateTopic.getTopicId());
            clsSinkDTO.setTopicId(CreateTopic.getTopicId());
            sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO));
            createTopicIndex(sinkInfo);
            StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
            CommonBeanUtils.copyProperties(sinkInfo, streamSinkEntity, true);
            this.streamSinkEntityMapper.updateByIdSelective(streamSinkEntity);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode().intValue(), "success to create cls resource");
            LOG.info("update cls sink = {}info status  success ,topicName {}", streamSinkEntity.getSinkName(), clsSinkDTO.getTopicName());
        } catch (TencentCloudSDKException e) {
            String str = "Create cls topic  failed: " + e.getMessage();
            LOG.error(str, e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode().intValue(), str);
            throw new BusinessException(str);
        }
    }

    private CreateTopicRequest getCreateTopicRequest(ClsDataNodeDTO clsDataNodeDTO, ClsSinkDTO clsSinkDTO) {
        CreateTopicRequest createTopicRequest = new CreateTopicRequest();
        createTopicRequest.setTags(convertTags(clsSinkDTO.getTag().split("\\|")));
        createTopicRequest.setLogsetId(clsDataNodeDTO.getLogSetId());
        createTopicRequest.setTopicName(clsSinkDTO.getTopicName());
        return createTopicRequest;
    }

    private ClsClient getClsClient(ClsDataNodeDTO clsDataNodeDTO) {
        Credential credential = new Credential(clsDataNodeDTO.getManageSecretId(), clsDataNodeDTO.getManageSecretId());
        HttpProfile httpProfile = new HttpProfile();
        httpProfile.setEndpoint(clsDataNodeDTO.getEndpoint());
        ClientProfile clientProfile = new ClientProfile();
        clientProfile.setHttpProfile(httpProfile);
        return new ClsClient(credential, clsDataNodeDTO.getRegion(), clientProfile);
    }

    private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException {
        ClsSinkDTO clsSinkDTO = (ClsSinkDTO) JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class);
        if (StringUtils.isNotBlank(clsSinkDTO.getTokenizer())) {
            LOG.warn("topic {} tokenizer is empty", clsSinkDTO.getTopicName());
            return;
        }
        ClsClient clsClient = getClsClient(getClsDataNode(sinkInfo));
        RuleInfo ruleInfo = new RuleInfo();
        FullTextInfo fullTextInfo = new FullTextInfo();
        fullTextInfo.setTokenizer(clsSinkDTO.getTokenizer());
        ruleInfo.setFullText(fullTextInfo);
        CreateIndexRequest createIndexRequest = new CreateIndexRequest();
        createIndexRequest.setTopicId(clsSinkDTO.getTopicId());
        createIndexRequest.setRule(ruleInfo);
        try {
            clsClient.CreateIndex(createIndexRequest);
            LOG.info("topic {} create index success tokenizer is {}", clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer());
        } catch (TencentCloudSDKException e) {
            String str = "Create cls topic index failed: " + e.getMessage();
            LOG.error(str, e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode().intValue(), str);
            throw new BusinessException(str);
        }
    }

    private Tag[] convertTags(String[] strArr) {
        Tag[] tagArr = new Tag[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            String[] split = strArr[i].split(":");
            Tag tag = new Tag();
            tag.set(split[0], split[1]);
            tagArr[i] = tag;
        }
        return tagArr;
    }

    private ClsDataNodeDTO getClsDataNode(SinkInfo sinkInfo) {
        return (ClsDataNodeDTO) JsonUtils.parseObject(this.dataNodeEntityMapper.selectByUniqueKey(sinkInfo.getDataNodeName(), "CLS").getExtParams(), ClsDataNodeDTO.class);
    }
}
