/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.source;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StreamSourceServiceImpl
implements StreamSourceService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSourceServiceImpl.class);
    @Autowired
    private SourceOperatorFactory operatorFactory;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private StreamSourceEntityMapper sourceMapper;
    @Autowired
    private StreamSourceFieldEntityMapper sourceFieldMapper;
    @Autowired
    private GroupCheckService groupCheckService;
    @Autowired
    private UserService userService;

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW)
    public Integer save(SourceRequest request, String operator) {
        LOGGER.info("begin to save source info: {}", (Object)request);
        this.checkParams(request);
        String groupId = request.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(groupId, operator);
        String streamId = request.getInlongStreamId();
        String sourceName = request.getSourceName();
        List existList = this.sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
        if (CollectionUtils.isNotEmpty((Collection)existList)) {
            String err = "source name=%s already exists with groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sourceName, groupId, streamId));
        }
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(request.getSourceType());
        List streamFields = request.getFieldList();
        if (CollectionUtils.isNotEmpty((Collection)streamFields)) {
            streamFields.forEach(streamField -> streamField.setId(null));
        }
        int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), operator);
        LOGGER.info("success to save source info: {}", (Object)request);
        return id;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW)
    public Integer save(SourceRequest request, UserInfo opInfo) {
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(request.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", request.getInlongGroupId()));
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
        if (streamEntity == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND, String.format("InlongStream does not exist with InlongGroupId=%s, InLongStreamId=%s", request.getInlongGroupId(), request.getInlongStreamId()));
        }
        List existList = this.sourceMapper.selectByRelatedId(request.getInlongGroupId(), request.getInlongStreamId(), request.getSourceName());
        if (CollectionUtils.isNotEmpty((Collection)existList)) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("source name=%s already exists with groupId=%s streamId=%s", request.getSourceName(), request.getInlongGroupId(), request.getInlongStreamId()));
        }
        GroupStatus status = GroupStatus.forCode((int)groupEntity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)status)) {
            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS, String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
        }
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(request.getSourceType());
        List streamFields = request.getFieldList();
        if (CollectionUtils.isNotEmpty((Collection)streamFields)) {
            streamFields.forEach(streamField -> streamField.setId(null));
        }
        return sourceOperator.saveOpt(request, groupEntity.getStatus(), opInfo.getName());
    }

    @Override
    public StreamSource get(Integer id) {
        if (id == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
        }
        StreamSourceEntity entity = this.sourceMapper.selectById(id);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("source not found by id=%s", id));
        }
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(entity.getSourceType());
        StreamSource streamSource = sourceOperator.getFromEntity(entity);
        LOGGER.debug("success to get source by id={}", (Object)id);
        return streamSource;
    }

    @Override
    public StreamSource get(Integer id, UserInfo opInfo) {
        StreamSourceEntity entity = this.sourceMapper.selectById(id);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("source not found by id=%s", id));
        }
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(entity.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(entity.getSourceType());
        return sourceOperator.getFromEntity(entity);
    }

    @Override
    public Integer getCount(String groupId, String streamId) {
        Integer count = this.sourceMapper.selectCount(groupId, streamId);
        LOGGER.debug("source count={} with groupId={}, streamId={}", new Object[]{count, groupId, streamId});
        return count;
    }

    @Override
    public List<StreamSource> listSource(String groupId, String streamId) {
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        List entityList = this.sourceMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            return Collections.emptyList();
        }
        ArrayList<StreamSource> responseList = new ArrayList<StreamSource>();
        entityList.forEach(entity -> responseList.add(this.get(entity.getId())));
        LOGGER.debug("success to list source by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return responseList;
    }

    @Override
    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos) {
        Map<String, List<StreamSource>> result;
        String groupId = groupInfo.getInlongGroupId();
        LOGGER.debug("begin to get source map for groupId={}", (Object)groupId);
        List<StreamSource> streamSources = this.listSource(groupId, null);
        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
            result = streamSources.stream().collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        } else {
            StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(groupInfo.getMqType());
            result = sourceOperator.getSourcesMap(groupInfo, streamInfos, streamSources);
        }
        LOGGER.debug("success to get source map, size={}, groupInfo={}", (Object)result.size(), (Object)groupInfo);
        return result;
    }

    @Override
    public PageResult<? extends StreamSource> listByCondition(SourcePageRequest request) {
        Preconditions.expectNotBlank((String)request.getInlongGroupId(), (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        List entityList = this.sourceMapper.selectByCondition(request);
        HashMap sourceMap = Maps.newHashMap();
        for (Object entity : entityList) {
            sourceMap.computeIfAbsent(entity.getSourceType(), k -> new Page()).add(entity);
        }
        ArrayList responseList = Lists.newArrayList();
        for (Map.Entry entry : sourceMap.entrySet()) {
            StreamSourceOperator sourceOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSource> pageInfo = sourceOperator.getPageInfo((Page<StreamSourceEntity>)((Page)entry.getValue()));
            if (null == pageInfo || !CollectionUtils.isNotEmpty((Collection)pageInfo.getList())) continue;
            responseList.addAll(pageInfo.getList());
        }
        PageResult pageResult = new PageResult((List)responseList);
        LOGGER.debug("success to list source page, result size {}", (Object)pageResult.getList().size());
        return pageResult;
    }

    @Override
    public PageResult<? extends StreamSource> listByCondition(SourcePageRequest request, UserInfo opInfo) {
        if (StringUtils.isBlank((CharSequence)request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        List entityList = this.sourceMapper.selectByCondition(request);
        ArrayList filteredEntitys = Lists.newArrayList();
        if (opInfo.getAccountType().equals(UserTypeEnum.ADMIN.getCode())) {
            filteredEntitys.addAll(entityList);
        } else {
            HashSet<String> totalGroupIds = new HashSet<String>();
            HashSet allowedGroupIds = new HashSet();
            for (StreamSourceEntity streamSourceEntity : entityList) {
                totalGroupIds.add(streamSourceEntity.getInlongGroupId());
            }
            for (String string : totalGroupIds) {
                List<String> inCharges;
                InlongGroupEntity entity = this.groupMapper.selectByGroupId(string);
                if (entity == null || !(inCharges = Arrays.asList(entity.getInCharges().split(","))).contains(opInfo.getName())) continue;
                allowedGroupIds.add(string);
            }
            for (StreamSourceEntity streamSourceEntity : entityList) {
                if (!allowedGroupIds.contains(streamSourceEntity.getInlongGroupId())) continue;
                filteredEntitys.add(streamSourceEntity);
            }
        }
        HashMap sourceMap = Maps.newHashMap();
        for (StreamSourceEntity entity : filteredEntitys) {
            sourceMap.computeIfAbsent(entity.getSourceType(), k -> new Page()).add((Object)entity);
        }
        ArrayList responseList = Lists.newArrayList();
        for (Map.Entry entry : sourceMap.entrySet()) {
            StreamSourceOperator sourceOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSource> pageInfo = sourceOperator.getPageInfo((Page<StreamSourceEntity>)((Page)entry.getValue()));
            if (null == pageInfo || !CollectionUtils.isNotEmpty((Collection)pageInfo.getList())) continue;
            responseList.addAll(pageInfo.getList());
        }
        return new PageResult((List)responseList);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean update(SourceRequest request, String operator) {
        LOGGER.info("begin to update source info: {}", (Object)request);
        this.chkUnmodifiableParams(request);
        String groupId = request.getInlongGroupId();
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(groupId, operator);
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(request.getSourceType());
        List streamFields = request.getFieldList();
        if (CollectionUtils.isNotEmpty((Collection)streamFields)) {
            streamFields.forEach(streamField -> streamField.setId(null));
        }
        sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getLightweight(), operator);
        LOGGER.info("success to update source info: {}", (Object)request);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean update(SourceRequest request, UserInfo opInfo) {
        this.chkUnmodifiableParams(request);
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(request.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("InlongGroup does not exist with InlongGroupId=%s", request.getInlongGroupId()));
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus status = GroupStatus.forCode((int)groupEntity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)status)) {
            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS, String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
        }
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(request.getSourceType());
        List streamFields = request.getFieldList();
        if (CollectionUtils.isNotEmpty((Collection)streamFields)) {
            streamFields.forEach(streamField -> streamField.setId(null));
        }
        sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getLightweight(), opInfo.getName());
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean updateStatus(String groupId, String streamId, Integer targetStatus, String operator) {
        this.sourceMapper.updateStatusByRelatedId(groupId, streamId, targetStatus);
        LOGGER.info("success to update source status={} for groupId={}, streamId={} by {}", new Object[]{targetStatus, groupId, streamId, operator});
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean delete(Integer id, String operator) {
        LOGGER.info("begin to delete source for id={} by user={}", (Object)id, (Object)operator);
        Preconditions.expectNotNull((Object)id, (String)ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSourceEntity entity = this.sourceMapper.selectByIdForUpdate(id);
        Preconditions.expectNotNull((Object)entity, (ErrorCodeEnum)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, (String)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        boolean isTemplateSource = CollectionUtils.isNotEmpty((Collection)this.sourceMapper.selectByTemplateId(id));
        SourceStatus curStatus = SourceStatus.forCode((int)entity.getStatus());
        SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
        if (curStatus == SourceStatus.SOURCE_FROZEN || curStatus == SourceStatus.SOURCE_FAILED || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource || "AUTO_PUSH".equals(entity.getSourceType())) {
            nextStatus = SourceStatus.SOURCE_DISABLE;
        }
        if (!SourceStatus.isAllowedTransition((SourceStatus)curStatus, (SourceStatus)nextStatus)) {
            throw new BusinessException(String.format("Source=%s is not allowed to delete", entity));
        }
        entity.setPreviousStatus(curStatus.getCode());
        entity.setStatus(nextStatus.getCode());
        entity.setIsDeleted(id);
        int rowCount = this.sourceMapper.updateByPrimaryKeySelective(entity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSourceName(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.sourceFieldMapper.deleteAll(id);
        LOGGER.info("success to delete source for id={} by user={}", (Object)id, (Object)operator);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean delete(Integer id, UserInfo opInfo) {
        StreamSourceEntity entity = this.sourceMapper.selectByIdForUpdate(id);
        Preconditions.expectNotNull((Object)entity, (ErrorCodeEnum)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, (String)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(entity.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId()));
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        boolean isTemplateSource = CollectionUtils.isNotEmpty((Collection)this.sourceMapper.selectByTemplateId(id));
        SourceStatus curStatus = SourceStatus.forCode((int)entity.getStatus());
        SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
        if (curStatus == SourceStatus.SOURCE_FROZEN || curStatus == SourceStatus.SOURCE_FAILED || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource || "AUTO_PUSH".equals(entity.getSourceType())) {
            nextStatus = SourceStatus.SOURCE_DISABLE;
        }
        if (!SourceStatus.isAllowedTransition((SourceStatus)curStatus, (SourceStatus)nextStatus)) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, String.format("Source=%s is not allowed to delete", entity));
        }
        entity.setPreviousStatus(curStatus.getCode());
        entity.setStatus(nextStatus.getCode());
        entity.setIsDeleted(id);
        entity.setModifier(opInfo.getName());
        int rowCount = this.sourceMapper.updateByPrimaryKeySelective(entity);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s", entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSourceName(), entity.getVersion()));
        }
        this.sourceFieldMapper.deleteAll(id);
        return true;
    }

    @Override
    public Boolean forceDelete(String groupId, String streamId, String operator) {
        LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}", new Object[]{groupId, streamId, operator});
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        int sourceCount = this.sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.TO_BE_ISSUED_DELETE.getCode());
        int fieldCount = this.sourceFieldMapper.updateByRelatedId(groupId, streamId);
        LOGGER.info("success to force delete source for groupId={} and streamId={} by user={}, update {} sources and {} fields", new Object[]{groupId, streamId, operator, sourceCount, fieldCount});
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean restart(Integer id, String operator) {
        LOGGER.info("begin to restart source by id={}", (Object)id);
        StreamSourceEntity entity = this.sourceMapper.selectByIdForUpdate(id);
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(entity.getSourceType());
        SourceRequest sourceRequest = new SourceRequest();
        CommonBeanUtils.copyProperties((Object)entity, (Object)sourceRequest, (boolean)true);
        sourceOperator.restartOpt(sourceRequest, operator);
        LOGGER.info("success to restart source info: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean stop(Integer id, String operator) {
        LOGGER.info("begin to stop source by id={}", (Object)id);
        StreamSourceEntity entity = this.sourceMapper.selectByIdForUpdate(id);
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
        StreamSourceOperator sourceOperator = this.operatorFactory.getInstance(entity.getSourceType());
        SourceRequest sourceRequest = new SourceRequest();
        CommonBeanUtils.copyProperties((Object)entity, (Object)sourceRequest, (boolean)true);
        sourceOperator.stopOpt(sourceRequest, operator);
        LOGGER.info("success to stop source info: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Integer nextStatus = SourceStatus.TO_BE_ISSUED_DELETE.getCode();
        List entityList = this.sourceMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            for (StreamSourceEntity entity : entityList) {
                Integer id = entity.getId();
                entity.setPreviousStatus(entity.getStatus());
                entity.setStatus(nextStatus);
                entity.setIsDeleted(id);
                entity.setModifier(operator);
                int rowCount = this.sourceMapper.updateByPrimaryKeySelective(entity);
                if (rowCount == InlongConstants.AFFECTED_ONE_ROW) continue;
                LOGGER.error("source has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSourceName(), entity.getVersion()});
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        }
        LOGGER.info("success to logic delete all source by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, propagation=Propagation.REQUIRES_NEW, isolation=Isolation.READ_COMMITTED)
    public Boolean deleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to delete all source by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(groupId, operator);
        this.sourceMapper.deleteByRelatedId(groupId, streamId);
        LOGGER.info("success to delete all source by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    public List<String> getSourceTypeList(String groupId, String streamId) {
        LOGGER.debug("begin to get source type list by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        if (StringUtils.isEmpty((CharSequence)streamId)) {
            return Collections.emptyList();
        }
        List resultList = this.sourceMapper.selectSourceType(groupId, streamId);
        LOGGER.debug("success to get source type list, result sourceType={}", (Object)resultList);
        return resultList;
    }

    private void checkParams(SourceRequest request) {
        Preconditions.expectNotNull((Object)request, (String)ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        String groupId = request.getInlongGroupId();
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        String streamId = request.getInlongStreamId();
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        String sourceType = request.getSourceType();
        Preconditions.expectNotBlank((String)sourceType, (ErrorCodeEnum)ErrorCodeEnum.SOURCE_TYPE_IS_NULL);
        String sourceName = request.getSourceName();
        Preconditions.expectNotBlank((String)sourceName, (ErrorCodeEnum)ErrorCodeEnum.SOURCE_NAME_IS_NULL);
    }

    private void chkUnmodifiableParams(SourceRequest request) {
        StreamSourceEntity entity = this.sourceMapper.selectById(request.getId());
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, String.format("not found source record by id=%d", request.getId()));
        }
        Preconditions.expectEquals((Object)entity.getSourceType(), (Object)request.getSourceType(), (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"sourceType not allowed modify");
        Preconditions.expectEquals((Object)entity.getVersion(), (Object)request.getVersion(), (ErrorCodeEnum)ErrorCodeEnum.CONFIG_EXPIRED, (String)String.format("record has expired with record version=%d, request version=%d", entity.getVersion(), request.getVersion()));
        if (StringUtils.isNotBlank((CharSequence)request.getInlongGroupId()) && !entity.getInlongGroupId().equals(request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongGroupId not allowed modify");
        }
        if (StringUtils.isNotBlank((CharSequence)request.getInlongStreamId()) && !entity.getInlongStreamId().equals(request.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongStreamId not allowed modify");
        }
        if (StringUtils.isNotBlank((CharSequence)request.getSourceName()) && !entity.getSourceName().equals(request.getSourceName())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sourceName not allowed modify");
        }
        request.setInlongGroupId(entity.getInlongGroupId());
        request.setInlongStreamId(entity.getInlongStreamId());
        request.setSourceName(entity.getSourceName());
    }
}

