package org.apache.inlong.manager.service.node;

import java.util.HashMap;
import java.util.Map;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:org/apache/inlong/manager/service/node/AbstractDataNodeOperator.class */
public abstract class AbstractDataNodeOperator implements DataNodeOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class);

    @Autowired
    protected DataNodeEntityMapper dataNodeEntityMapper;

    @Autowired
    protected StreamSourceEntityMapper sourceMapper;

    @Autowired
    protected InlongGroupEntityMapper groupMapper;

    @Autowired
    protected InlongStreamEntityMapper streamMapper;

    @Override // org.apache.inlong.manager.service.node.DataNodeOperator
    @Transactional(rollbackFor = {Throwable.class})
    public Integer saveOpt(DataNodeRequest dataNodeRequest, String str) {
        DataNodeEntity dataNodeEntity = (DataNodeEntity) CommonBeanUtils.copyProperties(dataNodeRequest, DataNodeEntity::new);
        setTargetEntity(dataNodeRequest, dataNodeEntity);
        dataNodeEntity.setCreator(str);
        dataNodeEntity.setModifier(str);
        this.dataNodeEntityMapper.insert(dataNodeEntity);
        return dataNodeEntity.getId();
    }

    protected abstract void setTargetEntity(DataNodeRequest dataNodeRequest, DataNodeEntity dataNodeEntity);

    @Override // org.apache.inlong.manager.service.node.DataNodeOperator
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public void updateOpt(DataNodeRequest dataNodeRequest, String str) {
        DataNodeEntity dataNodeEntity = (DataNodeEntity) CommonBeanUtils.copyProperties(dataNodeRequest, DataNodeEntity::new);
        setTargetEntity(dataNodeRequest, dataNodeEntity);
        dataNodeEntity.setModifier(str);
        int updateByIdSelective = this.dataNodeEntityMapper.updateByIdSelective(dataNodeEntity);
        if (updateByIdSelective != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("failure to update data node with name=%s, type=%s, request version=%d, updated row=%d", dataNodeRequest.getName(), dataNodeRequest.getType(), dataNodeRequest.getVersion(), Integer.valueOf(updateByIdSelective)));
        }
    }

    @Override // org.apache.inlong.manager.service.node.DataNodeOperator
    public Map<String, String> parse2SinkParams(DataNodeInfo dataNodeInfo) {
        return (Map) JsonUtils.parseObject(dataNodeInfo.getExtParams(), HashMap.class);
    }

    @Override // org.apache.inlong.manager.service.node.DataNodeOperator
    public Boolean testConnection(DataNodeRequest dataNodeRequest) {
        throw new BusinessException(String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), dataNodeRequest.getType()));
    }

    @Override // org.apache.inlong.manager.service.node.DataNodeOperator
    public void updateRelatedStreamSource(DataNodeRequest dataNodeRequest, DataNodeEntity dataNodeEntity, String str) {
        LOGGER.info("do nothing for the data node type ={}", dataNodeRequest.getType());
    }

    public void retryStreamSourceByDataNodeNameAndType(String str, String str2, String str3) {
        Integer code = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
        LOGGER.info("begin to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{str, code, str3});
        try {
            this.sourceMapper.updateStatusByIds(this.sourceMapper.selectNeedUpdateIdsByClusterAndDataNode((String) null, str, str2), code, str3);
            LOGGER.info("success to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{str, code, str3});
        } catch (Exception e) {
            LOGGER.error("failed to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{str, code, str3, e});
        }
    }
}
