/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sink.cls;

import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.resource.sink.cls.ClsOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ClsResourceOperator
extends AbstractStandaloneSinkResourceOperator {
    private static final Logger LOG = LoggerFactory.getLogger(ClsResourceOperator.class);
    @Autowired
    private DataNodeEntityMapper dataNodeEntityMapper;
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private StreamSinkEntityMapper streamSinkEntityMapper;
    @Autowired
    private ClsOperator clsOperator;

    @Override
    public Boolean accept(String sinkType) {
        return "CLS".equals(sinkType);
    }

    @Override
    public void createSinkResource(SinkInfo sinkInfo) {
        LOG.info("begin to create sink resources sinkId={}", (Object)sinkInfo.getId());
        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
            LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
            return;
        }
        if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
            return;
        }
        this.createClsResource(sinkInfo);
        this.assignCluster(sinkInfo);
    }

    private void createClsResource(SinkInfo sinkInfo) {
        ClsDataNodeDTO clsDataNode = this.getClsDataNode(sinkInfo);
        ClsSinkDTO clsSinkDTO = (ClsSinkDTO)JsonUtils.parseObject((String)sinkInfo.getExtParams(), ClsSinkDTO.class);
        try {
            String topicId = this.getTopicID(clsDataNode, clsSinkDTO);
            clsSinkDTO.setTopicId(topicId);
            sinkInfo.setExtParams(JsonUtils.toJsonString((Object)clsSinkDTO));
            this.clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(), clsSinkDTO.getTopicId(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getRegion());
            this.updateSinkInfo(sinkInfo, clsSinkDTO);
            String info = "success to create cls resource";
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
            LOG.info("update cls info status success for sinkId= {}, topicName = {}", (Object)sinkInfo.getSinkName(), (Object)clsSinkDTO.getTopicName());
        }
        catch (TencentCloudSDKException e) {
            String errMsg = "Create cls topic failed: " + e.getMessage();
            LOG.error(errMsg, (Throwable)e);
            this.sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
            throw new BusinessException(errMsg);
        }
    }

    private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) throws TencentCloudSDKException {
        String topicId = this.clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getRegion());
        if (StringUtils.isBlank((CharSequence)topicId)) {
            topicId = this.clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getRegion());
        }
        return topicId;
    }

    private void updateSinkInfo(SinkInfo sinkInfo, ClsSinkDTO clsSinkDTO) {
        StreamSinkEntity streamSinkEntity = this.streamSinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
        streamSinkEntity.setExtParams(JsonUtils.toJsonString((Object)clsSinkDTO));
        this.streamSinkEntityMapper.updateByIdSelective(streamSinkEntity);
    }

    private ClsDataNodeDTO getClsDataNode(SinkInfo sinkInfo) {
        DataNodeEntity dataNodeEntity = this.dataNodeEntityMapper.selectByUniqueKey(sinkInfo.getDataNodeName(), "CLS");
        return (ClsDataNodeDTO)JsonUtils.parseObject((String)dataNodeEntity.getExtParams(), ClsDataNodeDTO.class);
    }
}

