/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.node;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.service.node.DataNodeOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

public abstract class AbstractDataNodeOperator
implements DataNodeOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class);
    @Autowired
    protected DataNodeEntityMapper dataNodeEntityMapper;
    @Autowired
    protected StreamSourceEntityMapper sourceMapper;
    @Autowired
    protected InlongGroupEntityMapper groupMapper;
    @Autowired
    protected InlongStreamEntityMapper streamMapper;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer saveOpt(DataNodeRequest request, String operator) {
        DataNodeEntity entity = (DataNodeEntity)CommonBeanUtils.copyProperties((Object)request, DataNodeEntity::new);
        this.setTargetEntity(request, entity);
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.dataNodeEntityMapper.insert(entity);
        return entity.getId();
    }

    protected abstract void setTargetEntity(DataNodeRequest var1, DataNodeEntity var2);

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public void updateOpt(DataNodeRequest request, String operator) {
        DataNodeEntity entity = this.dataNodeEntityMapper.selectById(request.getId());
        this.setTargetEntity(request, entity);
        entity.setModifier(operator);
        int rowCount = this.dataNodeEntityMapper.updateByIdSelective(entity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("failure to update data node with name=%s, type=%s, request version=%d, updated row=%d", request.getName(), request.getType(), request.getVersion(), rowCount));
        }
    }

    @Override
    public Map<String, String> parse2SinkParams(DataNodeInfo info) {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap streamInfoMap = (HashMap)JsonUtils.parseObject((String)info.getExtParams(), HashMap.class);
        if (streamInfoMap == null) {
            return params;
        }
        for (String key : streamInfoMap.keySet()) {
            params.put(key, String.valueOf(streamInfoMap.get(key)));
        }
        return params;
    }

    @Override
    public Boolean testConnection(DataNodeRequest request) {
        throw new BusinessException(String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), request.getType()));
    }

    @Override
    public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity dataNodeEntity, String operator) {
        LOGGER.info("do nothing for the data node type ={}", (Object)request.getType());
    }

    public void retryStreamSourceByDataNodeNameAndType(String dataNodeName, String type, String operator) {
        Integer status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
        LOGGER.info("begin to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{dataNodeName, status, operator});
        List needUpdateIds = this.sourceMapper.selectNeedUpdateIdsByClusterAndDataNode(null, dataNodeName, type);
        if (CollectionUtils.isEmpty((Collection)needUpdateIds)) {
            LOGGER.warn("no found stream source by dataNodeName = {}", (Object)dataNodeName);
            return;
        }
        try {
            this.sourceMapper.updateStatusByIds(needUpdateIds, status, operator);
            LOGGER.info("success to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{dataNodeName, status, operator});
        }
        catch (Exception e) {
            LOGGER.error("failed to update stream source status by dataNodeName={}, status={}, by operator={}", new Object[]{dataNodeName, status, operator, e});
        }
    }
}

