package org.apache.inlong.manager.service.source;

import com.github.pagehelper.Page;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:org/apache/inlong/manager/service/source/AbstractSourceOperator.class */
public abstract class AbstractSourceOperator implements StreamSourceOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperator.class);

    @Autowired
    protected StreamSourceEntityMapper sourceMapper;

    @Autowired
    protected StreamSourceFieldEntityMapper sourceFieldMapper;

    @Autowired
    protected InlongStreamFieldEntityMapper streamFieldMapper;

    @Autowired
    protected DataNodeService dataNodeService;

    /* renamed from: org.apache.inlong.manager.service.source.AbstractSourceOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/manager/service/source/AbstractSourceOperator$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$manager$common$enums$SourceStatus = new int[SourceStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$SourceStatus[SourceStatus.SOURCE_NORMAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$SourceStatus[SourceStatus.HEARTBEAT_TIMEOUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$SourceStatus[SourceStatus.SOURCE_FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected abstract String getSourceType();

    protected abstract void setTargetEntity(SourceRequest sourceRequest, StreamSourceEntity streamSourceEntity);

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public String getExtParams(StreamSourceEntity streamSourceEntity) {
        return streamSourceEntity.getExtParams();
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    @Transactional(rollbackFor = {Throwable.class})
    public Integer saveOpt(SourceRequest sourceRequest, Integer num, String str) {
        StreamSourceEntity streamSourceEntity = (StreamSourceEntity) CommonBeanUtils.copyProperties(sourceRequest, StreamSourceEntity::new);
        if ("AUTO_PUSH".equals(sourceRequest.getSourceType())) {
            streamSourceEntity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
        } else if (GroupStatus.forCode(num.intValue()).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
            streamSourceEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
        } else {
            streamSourceEntity.setStatus(SourceStatus.SOURCE_NEW.getCode());
        }
        streamSourceEntity.setCreator(str);
        streamSourceEntity.setModifier(str);
        setTargetEntity(sourceRequest, streamSourceEntity);
        this.sourceMapper.insert(streamSourceEntity);
        saveFieldOpt(streamSourceEntity, sourceRequest.getFieldList());
        if (sourceRequest.getEnableSyncSchema().booleanValue()) {
            syncSourceFieldInfo(sourceRequest, str);
        }
        return streamSourceEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public List<StreamField> getSourceFields(Integer num) {
        return CommonBeanUtils.copyListProperties(this.sourceFieldMapper.selectBySourceId(num), StreamField::new);
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    public PageResult<? extends StreamSource> getPageInfo(Page<StreamSourceEntity> page) {
        return CollectionUtils.isEmpty(page) ? PageResult.empty() : PageResult.fromPage(page).map(this::getFromEntity);
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public void updateOpt(SourceRequest sourceRequest, Integer num, Integer num2, String str) {
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(sourceRequest.getId());
        if (selectByIdForUpdate == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("not found source record by id=%d", sourceRequest.getId()));
        }
        if ("AUTO_PUSH".equals(selectByIdForUpdate.getSourceType())) {
            updateFieldOpt(selectByIdForUpdate, sourceRequest.getFieldList());
            return;
        }
        if (!(InlongConstants.DATASYNC_MODE.equals(num2) || SourceStatus.ALLOWED_UPDATE.contains(selectByIdForUpdate.getStatus()))) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, String.format("source=%s is not allowed to update, please wait until its changed to final status or stop / frozen / delete it firstly", selectByIdForUpdate));
        }
        String format = String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s", sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId(), sourceRequest.getSourceName(), sourceRequest.getVersion());
        if (!Objects.equals(selectByIdForUpdate.getVersion(), sourceRequest.getVersion())) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, format);
        }
        if (!Objects.equals(selectByIdForUpdate.getSourceType(), sourceRequest.getSourceType())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("source type=%s cannot change to %s", selectByIdForUpdate.getSourceType(), sourceRequest.getSourceType()));
        }
        String inlongGroupId = sourceRequest.getInlongGroupId();
        String inlongStreamId = sourceRequest.getInlongStreamId();
        String sourceName = sourceRequest.getSourceName();
        Iterator it = this.sourceMapper.selectByRelatedId(inlongGroupId, inlongStreamId, sourceName).iterator();
        while (it.hasNext()) {
            if (!Objects.equals(((StreamSourceEntity) it.next()).getId(), sourceRequest.getId())) {
                throw new BusinessException(ErrorCodeEnum.SOURCE_ALREADY_EXISTS, String.format("source name=%s already exists with the groupId=%s streamId=%s", sourceName, inlongGroupId, inlongStreamId));
            }
        }
        setTargetEntity(sourceRequest, selectByIdForUpdate);
        selectByIdForUpdate.setModifier(str);
        selectByIdForUpdate.setPreviousStatus(selectByIdForUpdate.getStatus());
        if (InlongConstants.STANDARD_MODE.equals(num2)) {
            SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue());
            Integer status = selectByIdForUpdate.getStatus();
            if (!GroupStatus.forCode(num.intValue()).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
                switch (AnonymousClass1.$SwitchMap$org$apache$inlong$manager$common$enums$SourceStatus[SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue()).ordinal()]) {
                    case 1:
                    case 2:
                        status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
                        break;
                    case 3:
                        status = SourceStatus.SOURCE_NEW.getCode();
                        break;
                }
            } else {
                status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
            }
            selectByIdForUpdate.setStatus(status);
        }
        if (this.sourceMapper.updateByPrimaryKeySelective(selectByIdForUpdate) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, format);
        }
        updateFieldOpt(selectByIdForUpdate, sourceRequest.getFieldList());
        LOGGER.debug("success to update source of type={}", sourceRequest.getSourceType());
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public void stopOpt(SourceRequest sourceRequest, String str) {
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(sourceRequest.getId());
        SourceStatus forCode = SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue());
        SourceStatus sourceStatus = SourceStatus.TO_BE_ISSUED_STOP;
        if (forCode == SourceStatus.SOURCE_STOP) {
            return;
        }
        if (!SourceStatus.isAllowedTransition(forCode, sourceStatus)) {
            throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to stop", selectByIdForUpdate.getStatus(), selectByIdForUpdate.getId()));
        }
        StreamSourceEntity streamSourceEntity = (StreamSourceEntity) CommonBeanUtils.copyProperties(sourceRequest, StreamSourceEntity::new);
        streamSourceEntity.setPreviousStatus(forCode.getCode());
        streamSourceEntity.setStatus(sourceStatus.getCode());
        if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{streamSourceEntity.getInlongGroupId(), streamSourceEntity.getInlongStreamId(), streamSourceEntity.getSourceName(), streamSourceEntity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public void restartOpt(SourceRequest sourceRequest, String str) {
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(sourceRequest.getId());
        SourceStatus forCode = SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue());
        SourceStatus sourceStatus = SourceStatus.TO_BE_ISSUED_ACTIVE;
        if (!SourceStatus.isAllowedTransition(forCode, sourceStatus)) {
            throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to restart", selectByIdForUpdate.getStatus(), selectByIdForUpdate.getId()));
        }
        StreamSourceEntity streamSourceEntity = (StreamSourceEntity) CommonBeanUtils.copyProperties(sourceRequest, StreamSourceEntity::new);
        streamSourceEntity.setPreviousStatus(forCode.getCode());
        streamSourceEntity.setStatus(sourceStatus.getCode());
        if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{streamSourceEntity.getInlongGroupId(), streamSourceEntity.getInlongStreamId(), streamSourceEntity.getSourceName(), streamSourceEntity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    protected void updateFieldOpt(StreamSourceEntity streamSourceEntity, List<StreamField> list) {
        Integer id = streamSourceEntity.getId();
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        this.sourceFieldMapper.deleteAll(id);
        saveFieldOpt(streamSourceEntity, list);
        String inlongGroupId = streamSourceEntity.getInlongGroupId();
        String inlongStreamId = streamSourceEntity.getInlongStreamId();
        this.streamFieldMapper.deleteAllByIdentifier(inlongGroupId, inlongStreamId);
        saveStreamField(inlongGroupId, inlongStreamId, list);
        LOGGER.debug("success to update source fields");
    }

    protected void saveStreamField(String str, String str2, List<StreamField> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(streamField -> {
            streamField.setId((Integer) null);
        });
        List<InlongStreamFieldEntity> copyListProperties = CommonBeanUtils.copyListProperties(list, InlongStreamFieldEntity::new);
        for (InlongStreamFieldEntity inlongStreamFieldEntity : copyListProperties) {
            inlongStreamFieldEntity.setInlongGroupId(str);
            inlongStreamFieldEntity.setInlongStreamId(str2);
            inlongStreamFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
        }
        this.streamFieldMapper.insertAll(copyListProperties);
    }

    protected void saveFieldOpt(StreamSourceEntity streamSourceEntity, List<StreamField> list) {
        LOGGER.debug("begin to save source fields={}", list);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        ArrayList arrayList = new ArrayList(list.size());
        String inlongGroupId = streamSourceEntity.getInlongGroupId();
        String inlongStreamId = streamSourceEntity.getInlongStreamId();
        String sourceType = streamSourceEntity.getSourceType();
        Integer id = streamSourceEntity.getId();
        Iterator<StreamField> it = list.iterator();
        while (it.hasNext()) {
            StreamSourceFieldEntity streamSourceFieldEntity = (StreamSourceFieldEntity) CommonBeanUtils.copyProperties(it.next(), StreamSourceFieldEntity::new);
            if (StringUtils.isEmpty(streamSourceFieldEntity.getFieldComment())) {
                streamSourceFieldEntity.setFieldComment(streamSourceFieldEntity.getFieldName());
            }
            streamSourceFieldEntity.setInlongGroupId(inlongGroupId);
            streamSourceFieldEntity.setInlongStreamId(inlongStreamId);
            streamSourceFieldEntity.setSourceId(id);
            streamSourceFieldEntity.setSourceType(sourceType);
            streamSourceFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
            arrayList.add(streamSourceFieldEntity);
        }
        this.sourceFieldMapper.insertAll(arrayList);
        LOGGER.debug("success to save source fields");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getSerializationType(StreamSource streamSource, String str) {
        return StringUtils.isNotBlank(streamSource.getSerializationType()) ? streamSource.getSerializationType() : DataTypeEnum.forType(str).getType();
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceOperator
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public void syncSourceFieldInfo(SourceRequest sourceRequest, String str) {
        LOGGER.info("not support sync source field info for type ={}", sourceRequest.getSourceType());
    }
}
