package org.apache.inlong.manager.service.source;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/source/StreamSourceServiceImpl.class */
public class StreamSourceServiceImpl implements StreamSourceService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSourceServiceImpl.class);

    @Autowired
    private SourceOperatorFactory operatorFactory;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private StreamSourceEntityMapper sourceMapper;

    @Autowired
    private StreamSourceFieldEntityMapper sourceFieldMapper;

    @Autowired
    private GroupCheckService groupCheckService;

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW)
    public Integer save(SourceRequest sourceRequest, String str) {
        LOGGER.info("begin to save source info: {}", sourceRequest);
        checkParams(sourceRequest);
        String inlongGroupId = sourceRequest.getInlongGroupId();
        InlongGroupEntity checkGroupStatus = this.groupCheckService.checkGroupStatus(inlongGroupId, str);
        String inlongStreamId = sourceRequest.getInlongStreamId();
        String sourceName = sourceRequest.getSourceName();
        if (CollectionUtils.isNotEmpty(this.sourceMapper.selectByRelatedId(inlongGroupId, inlongStreamId, sourceName))) {
            throw new BusinessException(String.format("source name=%s already exists with groupId=%s streamId=%s", sourceName, inlongGroupId, inlongStreamId));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(sourceRequest.getSourceType());
        List fieldList = sourceRequest.getFieldList();
        if (CollectionUtils.isNotEmpty(fieldList)) {
            fieldList.forEach(streamField -> {
                streamField.setId((Integer) null);
            });
        }
        int intValue = sourceOperatorFactory.saveOpt(sourceRequest, checkGroupStatus.getStatus(), str).intValue();
        LOGGER.info("success to save source info: {}", sourceRequest);
        return Integer.valueOf(intValue);
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW)
    public Integer save(SourceRequest sourceRequest, UserInfo userInfo) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(sourceRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", sourceRequest.getInlongGroupId()));
        }
        if (this.streamMapper.selectByIdentifier(sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId()) == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND, String.format("InlongStream does not exist with InlongGroupId=%s, InLongStreamId=%s", sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId()));
        }
        if (CollectionUtils.isNotEmpty(this.sourceMapper.selectByRelatedId(sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId(), sourceRequest.getSourceName()))) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("source name=%s already exists with groupId=%s streamId=%s", sourceRequest.getSourceName(), sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId()));
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS, String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(sourceRequest.getSourceType());
        List fieldList = sourceRequest.getFieldList();
        if (CollectionUtils.isNotEmpty(fieldList)) {
            fieldList.forEach(streamField -> {
                streamField.setId((Integer) null);
            });
        }
        return sourceOperatorFactory.saveOpt(sourceRequest, selectByGroupId.getStatus(), userInfo.getName());
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public StreamSource get(Integer num) {
        if (num == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
        }
        StreamSourceEntity selectById = this.sourceMapper.selectById(num);
        if (selectById == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("source not found by id=%s", num));
        }
        StreamSource fromEntity = this.operatorFactory.getInstance(selectById.getSourceType()).getFromEntity(selectById);
        LOGGER.debug("success to get source by id={}", num);
        return fromEntity;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public StreamSource get(Integer num, UserInfo userInfo) {
        StreamSourceEntity selectById = this.sourceMapper.selectById(num);
        if (selectById == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("source not found by id=%s", num));
        }
        if (this.groupMapper.selectByGroupId(selectById.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        return this.operatorFactory.getInstance(selectById.getSourceType()).getFromEntity(selectById);
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public Integer getCount(String str, String str2) {
        Integer valueOf = Integer.valueOf(this.sourceMapper.selectCount(str, str2));
        LOGGER.debug("source count={} with groupId={}, streamId={}", new Object[]{valueOf, str, str2});
        return valueOf;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public List<StreamSource> listSource(String str, String str2) {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        List selectByRelatedId = this.sourceMapper.selectByRelatedId(str, str2, (String) null);
        if (CollectionUtils.isEmpty(selectByRelatedId)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        selectByRelatedId.forEach(streamSourceEntity -> {
            arrayList.add(get(streamSourceEntity.getId()));
        });
        LOGGER.debug("success to list source by groupId={}, streamId={}", str, str2);
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> list) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        LOGGER.debug("begin to get source map for groupId={}", inlongGroupId);
        List<StreamSource> listSource = listSource(inlongGroupId, null);
        Map<String, List<StreamSource>> sourcesMap = InlongConstants.DATASYNC_MODE.equals(inlongGroupInfo.getInlongGroupMode()) ? (Map) listSource.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new))) : this.operatorFactory.getInstance(inlongGroupInfo.getMqType()).getSourcesMap(inlongGroupInfo, list, listSource);
        LOGGER.debug("success to get source map, size={}, groupInfo={}", Integer.valueOf(sourcesMap.size()), inlongGroupInfo);
        return sourcesMap;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public PageResult<? extends StreamSource> listByCondition(SourcePageRequest sourcePageRequest) {
        PageHelper.startPage(sourcePageRequest.getPageNum(), sourcePageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(sourcePageRequest);
        OrderTypeEnum.checkOrderType(sourcePageRequest);
        Page selectByCondition = this.sourceMapper.selectByCondition(sourcePageRequest);
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = selectByCondition.iterator();
        while (it.hasNext()) {
            StreamSourceEntity streamSourceEntity = (StreamSourceEntity) it.next();
            ((Page) newHashMap.computeIfAbsent(streamSourceEntity.getSourceType(), str -> {
                return new Page();
            })).add(streamSourceEntity);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            PageResult<? extends StreamSource> pageInfo = this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue());
            if (null != pageInfo && CollectionUtils.isNotEmpty(pageInfo.getList())) {
                newArrayList.addAll(pageInfo.getList());
            }
        }
        PageResult<? extends StreamSource> pageResult = new PageResult<>(newArrayList, Long.valueOf(selectByCondition.getTotal()), Integer.valueOf(selectByCondition.getPageNum()), Integer.valueOf(selectByCondition.getPageSize()));
        LOGGER.debug("success to list source page, result size {}", Integer.valueOf(pageResult.getList().size()));
        return pageResult;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public PageResult<? extends StreamSource> listByCondition(SourcePageRequest sourcePageRequest, UserInfo userInfo) {
        PageHelper.startPage(sourcePageRequest.getPageNum(), sourcePageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(sourcePageRequest);
        OrderTypeEnum.checkOrderType(sourcePageRequest);
        List<StreamSourceEntity> selectByCondition = this.sourceMapper.selectByCondition(sourcePageRequest);
        ArrayList<StreamSourceEntity> newArrayList = Lists.newArrayList();
        if (userInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode())) {
            newArrayList.addAll(selectByCondition);
        } else {
            HashSet<String> hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            Iterator it = selectByCondition.iterator();
            while (it.hasNext()) {
                hashSet.add(((StreamSourceEntity) it.next()).getInlongGroupId());
            }
            for (String str : hashSet) {
                InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
                if (selectByGroupId != null && Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName())) {
                    hashSet2.add(str);
                }
            }
            for (StreamSourceEntity streamSourceEntity : selectByCondition) {
                if (hashSet2.contains(streamSourceEntity.getInlongGroupId())) {
                    newArrayList.add(streamSourceEntity);
                }
            }
        }
        HashMap newHashMap = Maps.newHashMap();
        for (StreamSourceEntity streamSourceEntity2 : newArrayList) {
            ((Page) newHashMap.computeIfAbsent(streamSourceEntity2.getSourceType(), str2 -> {
                return new Page();
            })).add(streamSourceEntity2);
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            PageResult<? extends StreamSource> pageInfo = this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue());
            if (null != pageInfo && CollectionUtils.isNotEmpty(pageInfo.getList())) {
                newArrayList2.addAll(pageInfo.getList());
            }
        }
        return new PageResult<>(newArrayList2);
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean update(SourceRequest sourceRequest, String str) {
        LOGGER.info("begin to update source info: {}", sourceRequest);
        chkUnmodifiableParams(sourceRequest);
        InlongGroupEntity checkGroupStatus = this.groupCheckService.checkGroupStatus(sourceRequest.getInlongGroupId(), str);
        if (checkGroupStatus == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", checkGroupStatus.getInlongGroupId()));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(sourceRequest.getSourceType());
        List fieldList = sourceRequest.getFieldList();
        if (CollectionUtils.isNotEmpty(fieldList)) {
            fieldList.forEach(streamField -> {
                streamField.setId((Integer) null);
            });
        }
        sourceOperatorFactory.updateOpt(sourceRequest, checkGroupStatus.getStatus(), checkGroupStatus.getInlongGroupMode(), str);
        LOGGER.info("success to update source info: {}", sourceRequest);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean update(SourceRequest sourceRequest, UserInfo userInfo) {
        chkUnmodifiableParams(sourceRequest);
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(sourceRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("InlongGroup does not exist with InlongGroupId=%s", sourceRequest.getInlongGroupId()));
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS, String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(sourceRequest.getSourceType());
        List fieldList = sourceRequest.getFieldList();
        if (CollectionUtils.isNotEmpty(fieldList)) {
            fieldList.forEach(streamField -> {
                streamField.setId((Integer) null);
            });
        }
        sourceOperatorFactory.updateOpt(sourceRequest, selectByGroupId.getStatus(), selectByGroupId.getInlongGroupMode(), userInfo.getName());
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean updateStatus(String str, String str2, Integer num, String str3) {
        this.sourceMapper.updateStatusByRelatedId(str, str2, num);
        LOGGER.info("success to update source status={} for groupId={}, streamId={} by {}", new Object[]{num, str, str2, str3});
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean delete(Integer num, String str) {
        LOGGER.info("begin to delete source for id={} by user={}", num, str);
        Preconditions.expectNotNull(num, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(num);
        Preconditions.expectNotNull(selectByIdForUpdate, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        boolean isNotEmpty = CollectionUtils.isNotEmpty(this.sourceMapper.selectByTemplateId(num));
        if (this.groupMapper.selectByGroupId(selectByIdForUpdate.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByIdForUpdate.getInlongGroupId()));
        }
        SourceStatus forCode = SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue());
        SourceStatus sourceStatus = SourceStatus.TO_BE_ISSUED_DELETE;
        if (forCode == SourceStatus.SOURCE_STOP || forCode == SourceStatus.SOURCE_FAILED || forCode == SourceStatus.SOURCE_NEW || isNotEmpty || "AUTO_PUSH".equals(selectByIdForUpdate.getSourceType())) {
            sourceStatus = SourceStatus.SOURCE_DISABLE;
        }
        if (!SourceStatus.isAllowedTransition(forCode, sourceStatus)) {
            throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to delete", selectByIdForUpdate.getStatus(), selectByIdForUpdate.getId()));
        }
        selectByIdForUpdate.setPreviousStatus(forCode.getCode());
        selectByIdForUpdate.setStatus(sourceStatus.getCode());
        selectByIdForUpdate.setIsDeleted(num);
        if (this.sourceMapper.updateByPrimaryKeySelective(selectByIdForUpdate) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{selectByIdForUpdate.getInlongGroupId(), selectByIdForUpdate.getInlongStreamId(), selectByIdForUpdate.getSourceName(), selectByIdForUpdate.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.sourceFieldMapper.deleteAll(num);
        LOGGER.info("success to delete source for id={} by user={}", num, str);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean delete(Integer num, UserInfo userInfo) {
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(num);
        Preconditions.expectNotNull(selectByIdForUpdate, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        if (this.groupMapper.selectByGroupId(selectByIdForUpdate.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByIdForUpdate.getInlongGroupId()));
        }
        boolean isNotEmpty = CollectionUtils.isNotEmpty(this.sourceMapper.selectByTemplateId(num));
        SourceStatus forCode = SourceStatus.forCode(selectByIdForUpdate.getStatus().intValue());
        SourceStatus sourceStatus = SourceStatus.TO_BE_ISSUED_DELETE;
        if (forCode == SourceStatus.SOURCE_STOP || forCode == SourceStatus.SOURCE_FAILED || forCode == SourceStatus.SOURCE_NEW || isNotEmpty || "AUTO_PUSH".equals(selectByIdForUpdate.getSourceType())) {
            sourceStatus = SourceStatus.SOURCE_DISABLE;
        }
        if (!SourceStatus.isAllowedTransition(forCode, sourceStatus)) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, String.format("current source status=%s for id=%s is not allowed to delete", selectByIdForUpdate.getStatus(), selectByIdForUpdate.getId()));
        }
        selectByIdForUpdate.setPreviousStatus(forCode.getCode());
        selectByIdForUpdate.setStatus(sourceStatus.getCode());
        selectByIdForUpdate.setIsDeleted(num);
        selectByIdForUpdate.setModifier(userInfo.getName());
        if (this.sourceMapper.updateByPrimaryKeySelective(selectByIdForUpdate) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s", selectByIdForUpdate.getInlongGroupId(), selectByIdForUpdate.getInlongStreamId(), selectByIdForUpdate.getSourceName(), selectByIdForUpdate.getVersion()));
        }
        this.sourceFieldMapper.deleteAll(num);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public Boolean forceDelete(String str, String str2, String str3) {
        LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}", new Object[]{str, str2, str3});
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        LOGGER.info("success to force delete source for groupId={} and streamId={} by user={}, update {} sources and {} fields", new Object[]{str, str2, str3, Integer.valueOf(this.sourceMapper.logicalDeleteByRelatedId(str, str2, SourceStatus.TO_BE_ISSUED_DELETE.getCode())), Integer.valueOf(this.sourceFieldMapper.updateByRelatedId(str, str2))});
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean restart(Integer num, String str) {
        LOGGER.info("begin to restart source by id={}", num);
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(num);
        Preconditions.expectNotNull(selectByIdForUpdate, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        if (this.groupMapper.selectByGroupId(selectByIdForUpdate.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByIdForUpdate.getInlongGroupId()));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(selectByIdForUpdate.getSourceType());
        SourceRequest sourceRequest = new SourceRequest();
        CommonBeanUtils.copyProperties(selectByIdForUpdate, sourceRequest, true);
        sourceOperatorFactory.restartOpt(sourceRequest, str);
        LOGGER.info("success to restart source info: {}", selectByIdForUpdate);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean stop(Integer num, String str) {
        LOGGER.info("begin to stop source by id={}", num);
        StreamSourceEntity selectByIdForUpdate = this.sourceMapper.selectByIdForUpdate(num);
        Preconditions.expectNotNull(selectByIdForUpdate, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        if (this.groupMapper.selectByGroupId(selectByIdForUpdate.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByIdForUpdate.getInlongGroupId()));
        }
        StreamSourceOperator sourceOperatorFactory = this.operatorFactory.getInstance(selectByIdForUpdate.getSourceType());
        SourceRequest sourceRequest = new SourceRequest();
        CommonBeanUtils.copyProperties(selectByIdForUpdate, sourceRequest, true);
        sourceOperatorFactory.stopOpt(sourceRequest, str);
        LOGGER.info("success to stop source info: {}", selectByIdForUpdate);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean logicDeleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Integer code = SourceStatus.TO_BE_ISSUED_DELETE.getCode();
        List<StreamSourceEntity> selectByRelatedId = this.sourceMapper.selectByRelatedId(str, str2, (String) null);
        if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            for (StreamSourceEntity streamSourceEntity : selectByRelatedId) {
                Integer id = streamSourceEntity.getId();
                streamSourceEntity.setPreviousStatus(streamSourceEntity.getStatus());
                streamSourceEntity.setStatus(code);
                streamSourceEntity.setIsDeleted(id);
                streamSourceEntity.setModifier(str3);
                if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                    LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{streamSourceEntity.getInlongGroupId(), streamSourceEntity.getInlongStreamId(), streamSourceEntity.getSourceName(), streamSourceEntity.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
            }
        }
        LOGGER.info("success to logic delete all source by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    @Transactional(rollbackFor = {Throwable.class}, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
    public Boolean deleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to delete all source by groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(str, str3);
        this.sourceMapper.deleteByRelatedId(str, str2);
        LOGGER.info("success to delete all source by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.source.StreamSourceService
    public List<String> getSourceTypeList(String str, String str2) {
        LOGGER.debug("begin to get source type list by groupId={}, streamId={}", str, str2);
        if (StringUtils.isEmpty(str2)) {
            return Collections.emptyList();
        }
        List<String> selectSourceType = this.sourceMapper.selectSourceType(str, str2);
        LOGGER.debug("success to get source type list, result sourceType={}", selectSourceType);
        return selectSourceType;
    }

    private void checkParams(SourceRequest sourceRequest) {
        Preconditions.expectNotNull(sourceRequest, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        Preconditions.expectNotBlank(sourceRequest.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(sourceRequest.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        Preconditions.expectNotBlank(sourceRequest.getSourceType(), ErrorCodeEnum.SOURCE_TYPE_IS_NULL);
        Preconditions.expectNotBlank(sourceRequest.getSourceName(), ErrorCodeEnum.SOURCE_NAME_IS_NULL);
    }

    private void chkUnmodifiableParams(SourceRequest sourceRequest) {
        StreamSourceEntity selectById = this.sourceMapper.selectById(sourceRequest.getId());
        if (selectById == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("not found source record by id=%d", sourceRequest.getId()));
        }
        Preconditions.expectEquals(selectById.getSourceType(), sourceRequest.getSourceType(), ErrorCodeEnum.INVALID_PARAMETER, "sourceType not allowed modify");
        Preconditions.expectEquals(selectById.getVersion(), sourceRequest.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED, String.format("record has expired with record version=%d, request version=%d", selectById.getVersion(), sourceRequest.getVersion()));
        if (StringUtils.isNotBlank(sourceRequest.getInlongGroupId()) && !selectById.getInlongGroupId().equals(sourceRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongGroupId not allowed modify");
        }
        if (StringUtils.isNotBlank(sourceRequest.getInlongStreamId()) && !selectById.getInlongStreamId().equals(sourceRequest.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongStreamId not allowed modify");
        }
        if (StringUtils.isNotBlank(sourceRequest.getSourceName()) && !selectById.getSourceName().equals(sourceRequest.getSourceName())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sourceName not allowed modify");
        }
        sourceRequest.setInlongGroupId(selectById.getInlongGroupId());
        sourceRequest.setInlongStreamId(selectById.getInlongStreamId());
        sourceRequest.setSourceName(selectById.getSourceName());
    }
}
