/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.source;

import com.github.pagehelper.Page;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

public abstract class AbstractSourceOperator
implements StreamSourceOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperator.class);
    @Autowired
    protected StreamSourceEntityMapper sourceMapper;
    @Autowired
    protected StreamSourceFieldEntityMapper sourceFieldMapper;

    protected abstract String getSourceType();

    protected abstract void setTargetEntity(SourceRequest var1, StreamSourceEntity var2);

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
        StreamSourceEntity entity = (StreamSourceEntity)CommonBeanUtils.copyProperties((Object)request, StreamSourceEntity::new);
        if (GroupStatus.forCode((int)groupStatus).equals((Object)GroupStatus.CONFIG_SUCCESSFUL)) {
            entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
        } else {
            entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
        }
        entity.setCreator(operator);
        entity.setModifier(operator);
        this.setTargetEntity(request, entity);
        this.sourceMapper.insert(entity);
        this.saveFieldOpt(entity, request.getFieldList());
        return entity.getId();
    }

    @Override
    public List<StreamField> getSourceFields(Integer sourceId) {
        List sourceFieldEntities = this.sourceFieldMapper.selectBySourceId(sourceId);
        return CommonBeanUtils.copyListProperties((List)sourceFieldEntities, StreamField::new);
    }

    @Override
    public PageResult<? extends StreamSource> getPageInfo(Page<StreamSourceEntity> entityPage) {
        if (CollectionUtils.isEmpty(entityPage)) {
            return PageResult.empty();
        }
        List streamSources = entityPage.stream().map(this::getFromEntity).collect(Collectors.toList());
        return new PageResult(streamSources, Long.valueOf(entityPage.getTotal()), Integer.valueOf(entityPage.getPageNum()), Integer.valueOf(entityPage.size()));
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public void updateOpt(SourceRequest request, Integer groupStatus, String operator) {
        StreamSourceEntity entity = this.sourceMapper.selectByIdForUpdate(request.getId());
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
            throw new BusinessException(String.format("source=%s is not allowed to update, please wait until its changed to final status or stop / frozen / delete it firstly", entity));
        }
        String errMsg = String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s", request.getInlongGroupId(), request.getInlongStreamId(), request.getSourceName(), request.getVersion());
        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
            LOGGER.error(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
            throw new BusinessException(String.format("source type=%s cannot change to %s", entity.getSourceType(), request.getSourceType()));
        }
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sourceName = request.getSourceName();
        List sourceList = this.sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
        for (StreamSourceEntity sourceEntity : sourceList) {
            Integer sourceId = sourceEntity.getId();
            if (Objects.equals(sourceId, request.getId())) continue;
            String err = "source name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sourceName, groupId, streamId));
        }
        this.setTargetEntity(request, entity);
        entity.setModifier(operator);
        entity.setPreviousStatus(entity.getStatus());
        if (GroupStatus.forCode((int)groupStatus).equals((Object)GroupStatus.CONFIG_SUCCESSFUL)) {
            entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
        } else {
            switch (SourceStatus.forCode((int)entity.getStatus())) {
                case SOURCE_NORMAL: {
                    entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
                    break;
                }
                case SOURCE_FAILED: {
                    entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
                    break;
                }
            }
        }
        int rowCount = this.sourceMapper.updateByPrimaryKeySelective(entity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.warn(errMsg);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.updateFieldOpt(entity, request.getFieldList());
        LOGGER.info("success to update source of type={}", (Object)request.getSourceType());
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public void stopOpt(SourceRequest request, String operator) {
        SourceStatus nextState;
        StreamSourceEntity existEntity = this.sourceMapper.selectByIdForUpdate(request.getId());
        SourceStatus curState = SourceStatus.forCode((int)existEntity.getStatus());
        if (!SourceStatus.isAllowedTransition((SourceStatus)curState, (SourceStatus)(nextState = SourceStatus.TO_BE_ISSUED_FROZEN))) {
            throw new BusinessException(String.format("source=%s is not allowed to stop", existEntity));
        }
        StreamSourceEntity curEntity = (StreamSourceEntity)CommonBeanUtils.copyProperties((Object)request, StreamSourceEntity::new);
        curEntity.setPreviousStatus(curState.getCode());
        curEntity.setStatus(nextState.getCode());
        int rowCount = this.sourceMapper.updateByPrimaryKeySelective(curEntity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{curEntity.getInlongGroupId(), curEntity.getInlongStreamId(), curEntity.getSourceName(), curEntity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.REPEATABLE_READ)
    public void restartOpt(SourceRequest request, String operator) {
        SourceStatus nextState;
        StreamSourceEntity existEntity = this.sourceMapper.selectByIdForUpdate(request.getId());
        SourceStatus curState = SourceStatus.forCode((int)existEntity.getStatus());
        if (!SourceStatus.isAllowedTransition((SourceStatus)curState, (SourceStatus)(nextState = SourceStatus.TO_BE_ISSUED_ACTIVE))) {
            throw new BusinessException(String.format("Source=%s is not allowed to restart", existEntity));
        }
        StreamSourceEntity curEntity = (StreamSourceEntity)CommonBeanUtils.copyProperties((Object)request, StreamSourceEntity::new);
        curEntity.setPreviousStatus(curState.getCode());
        curEntity.setStatus(nextState.getCode());
        int rowCount = this.sourceMapper.updateByPrimaryKeySelective(curEntity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{curEntity.getInlongGroupId(), curEntity.getInlongStreamId(), curEntity.getSourceName(), curEntity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    private void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
        Integer sourceId = entity.getId();
        if (CollectionUtils.isEmpty(fieldInfos)) {
            return;
        }
        this.sourceFieldMapper.deleteAll(sourceId);
        this.saveFieldOpt(entity, fieldInfos);
        LOGGER.info("success to update source fields");
    }

    private void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
        LOGGER.info("begin to save source fields={}", fieldInfos);
        if (CollectionUtils.isEmpty(fieldInfos)) {
            return;
        }
        int size = fieldInfos.size();
        ArrayList<StreamSourceFieldEntity> entityList = new ArrayList<StreamSourceFieldEntity>(size);
        String groupId = entity.getInlongGroupId();
        String streamId = entity.getInlongStreamId();
        String sourceType = entity.getSourceType();
        Integer sourceId = entity.getId();
        for (StreamField fieldInfo : fieldInfos) {
            StreamSourceFieldEntity fieldEntity = (StreamSourceFieldEntity)CommonBeanUtils.copyProperties((Object)fieldInfo, StreamSourceFieldEntity::new);
            if (StringUtils.isEmpty((CharSequence)fieldEntity.getFieldComment())) {
                fieldEntity.setFieldComment(fieldEntity.getFieldName());
            }
            fieldEntity.setInlongGroupId(groupId);
            fieldEntity.setInlongStreamId(streamId);
            fieldEntity.setSourceId(sourceId);
            fieldEntity.setSourceType(sourceType);
            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
            entityList.add(fieldEntity);
        }
        this.sourceFieldMapper.insertAll(entityList);
        LOGGER.info("success to save source fields");
    }
}

