/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sink.cls;

import com.tencentcloudapi.cls.v20201016.ClsClient;
import com.tencentcloudapi.cls.v20201016.models.CreateIndexRequest;
import com.tencentcloudapi.cls.v20201016.models.CreateTopicRequest;
import com.tencentcloudapi.cls.v20201016.models.CreateTopicResponse;
import com.tencentcloudapi.cls.v20201016.models.FullTextInfo;
import com.tencentcloudapi.cls.v20201016.models.RuleInfo;
import com.tencentcloudapi.cls.v20201016.models.Tag;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ClsResourceOperator
implements SinkResourceOperator {
    private static final Logger LOG = LoggerFactory.getLogger(ClsResourceOperator.class);
    @Autowired
    private DataNodeEntityMapper dataNodeEntityMapper;
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private StreamSinkEntityMapper streamSinkEntityMapper;

    @Override
    public Boolean accept(String sinkType) {
        return "CLS".equals(sinkType);
    }

    @Override
    public void createSinkResource(SinkInfo sinkInfo) {
        LOG.info("begin to create sink resources sinkId={}", (Object)sinkInfo.getId());
        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
            LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
            return;
        }
        if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
            return;
        }
        this.createTopicID(sinkInfo);
    }

    private void createTopicID(SinkInfo sinkInfo) {
        ClsDataNodeDTO clsDataNode = this.getClsDataNode(sinkInfo);
        ClsSinkDTO clsSinkDTO = (ClsSinkDTO)JsonUtils.parseObject((String)sinkInfo.getExtParams(), ClsSinkDTO.class);
        try {
            ClsClient client = this.getClsClient(clsDataNode);
            CreateTopicRequest req = this.getCreateTopicRequest(clsDataNode, clsSinkDTO);
            CreateTopicResponse resp = client.CreateTopic(req);
            LOG.info("create cls topic {} success ,topicId {}", (Object)clsSinkDTO.getTopicName(), (Object)resp.getTopicId());
            clsSinkDTO.setTopicId(resp.getTopicId());
            sinkInfo.setExtParams(JsonUtils.toJsonString((Object)clsSinkDTO));
            this.createTopicIndex(sinkInfo);
            StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
            CommonBeanUtils.copyProperties((Object)sinkInfo, (Object)streamSinkEntity, (boolean)true);
            this.streamSinkEntityMapper.updateByIdSelective(streamSinkEntity);
            String info = "success to create cls resource";
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
            LOG.info("update cls sink = {}info status  success ,topicName {}", (Object)streamSinkEntity.getSinkName(), (Object)clsSinkDTO.getTopicName());
        }
        catch (TencentCloudSDKException e) {
            String errMsg = "Create cls topic  failed: " + e.getMessage();
            LOG.error(errMsg, (Throwable)e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
            throw new BusinessException(errMsg);
        }
    }

    private CreateTopicRequest getCreateTopicRequest(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) {
        CreateTopicRequest req = new CreateTopicRequest();
        String[] allTags = clsSinkDTO.getTag().split("\\|");
        req.setTags(this.convertTags(allTags));
        req.setLogsetId(clsDataNode.getLogSetId());
        req.setTopicName(clsSinkDTO.getTopicName());
        return req;
    }

    private ClsClient getClsClient(ClsDataNodeDTO clsDataNode) {
        Credential cred = new Credential(clsDataNode.getManageSecretId(), clsDataNode.getManageSecretId());
        HttpProfile httpProfile = new HttpProfile();
        httpProfile.setEndpoint(clsDataNode.getEndpoint());
        ClientProfile clientProfile = new ClientProfile();
        clientProfile.setHttpProfile(httpProfile);
        return new ClsClient(cred, clsDataNode.getRegion(), clientProfile);
    }

    private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException {
        ClsSinkDTO clsSinkDTO = (ClsSinkDTO)JsonUtils.parseObject((String)sinkInfo.getExtParams(), ClsSinkDTO.class);
        if (StringUtils.isNotBlank((CharSequence)clsSinkDTO.getTokenizer())) {
            LOG.warn("topic {} tokenizer is empty", (Object)clsSinkDTO.getTopicName());
            return;
        }
        ClsDataNodeDTO clsDataNode = this.getClsDataNode(sinkInfo);
        ClsClient clsClient = this.getClsClient(clsDataNode);
        RuleInfo ruleInfo = new RuleInfo();
        FullTextInfo fullTextInfo = new FullTextInfo();
        fullTextInfo.setTokenizer(clsSinkDTO.getTokenizer());
        ruleInfo.setFullText(fullTextInfo);
        CreateIndexRequest req = new CreateIndexRequest();
        req.setTopicId(clsSinkDTO.getTopicId());
        req.setRule(ruleInfo);
        try {
            clsClient.CreateIndex(req);
        }
        catch (TencentCloudSDKException e) {
            String errMsg = "Create cls topic index failed: " + e.getMessage();
            LOG.error(errMsg, (Throwable)e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
            throw new BusinessException(errMsg);
        }
        LOG.info("topic {} create index success tokenizer is {}", (Object)clsSinkDTO.getTopicName(), (Object)clsSinkDTO.getTokenizer());
    }

    private Tag[] convertTags(String[] allTags) {
        Tag[] tags = new Tag[allTags.length];
        for (int i = 0; i < allTags.length; ++i) {
            String tag = allTags[i];
            String[] keyAndValueOfTag = tag.split(":");
            Tag tagInfo = new Tag();
            tagInfo.set(keyAndValueOfTag[0], (Object)keyAndValueOfTag[1]);
            tags[i] = tagInfo;
        }
        return tags;
    }

    private ClsDataNodeDTO getClsDataNode(SinkInfo sinkInfo) {
        DataNodeEntity dataNodeEntity = this.dataNodeEntityMapper.selectByUniqueKey(sinkInfo.getDataNodeName(), "CLS");
        return (ClsDataNodeDTO)JsonUtils.parseObject((String)dataNodeEntity.getExtParams(), ClsDataNodeDTO.class);
    }
}

