package org.apache.inlong.manager.service.sink;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.class */
public class StreamSinkServiceImpl implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);

    @Autowired
    private SinkOperatorFactory operatorFactory;

    @Autowired
    private GroupCheckService groupCheckService;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private StreamSinkEntityMapper sinkMapper;

    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;

    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;
    private InlongStreamProcessService streamProcessOperation;

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to save sink info: {}", sinkRequest);
        checkParams(sinkRequest);
        String inlongGroupId = sinkRequest.getInlongGroupId();
        this.groupCheckService.checkGroupStatus(inlongGroupId, str);
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkName = sinkRequest.getSinkName();
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        Preconditions.checkNotNull(selectByIdentifier, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(inlongGroupId, inlongStreamId, sinkName);
        if (selectByUniqueKey != null && selectByUniqueKey.getSinkName().equals(sinkName)) {
            throw new BusinessException(String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkName, inlongGroupId, inlongStreamId));
        }
        StreamSinkOperator sinkOperatorFactory = this.operatorFactory.getInstance(sinkRequest.getSinkType());
        List sinkFieldList = sinkRequest.getSinkFieldList();
        if (CollectionUtils.isNotEmpty(sinkFieldList)) {
            sinkFieldList.forEach(sinkField -> {
                sinkField.setId((Integer) null);
            });
        }
        int intValue = sinkOperatorFactory.saveOpt(sinkRequest, str).intValue();
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            SinkStatus sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(intValue));
            selectByPrimaryKey.setStatus(sinkStatus.getCode());
            this.sinkMapper.updateStatus(selectByPrimaryKey);
        }
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(inlongGroupId, inlongStreamId, str);
        }
        LOGGER.info("success to save sink info: {}", sinkRequest);
        return Integer.valueOf(intValue);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(SinkRequest sinkRequest, UserInfo userInfo) {
        checkSinkRequestParams(sinkRequest);
        if (userInfo == null) {
            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
        }
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(sinkRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", sinkRequest.getInlongGroupId()));
        }
        if (!userInfo.getRoles().contains(UserTypeEnum.ADMIN.name()) && !Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), sinkRequest.getSinkName());
        if (selectByUniqueKey != null && selectByUniqueKey.getSinkName().equals(sinkRequest.getSinkName())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkRequest.getSinkName(), sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId()));
        }
        StreamSinkOperator sinkOperatorFactory = this.operatorFactory.getInstance(sinkRequest.getSinkType());
        List sinkFieldList = sinkRequest.getSinkFieldList();
        if (CollectionUtils.isNotEmpty(sinkFieldList)) {
            sinkFieldList.forEach(sinkField -> {
                sinkField.setId((Integer) null);
            });
        }
        int intValue = sinkOperatorFactory.saveOpt(sinkRequest, userInfo.getName()).intValue();
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            SinkStatus sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(intValue));
            selectByPrimaryKey.setStatus(sinkStatus.getCode());
            this.sinkMapper.updateStatus(selectByPrimaryKey);
        }
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), userInfo.getName());
        }
        return Integer.valueOf(intValue);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public StreamSink get(Integer num) {
        Preconditions.checkNotNull(num, "sink id is empty");
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            LOGGER.error("sink not found by id={}", num);
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        StreamSink fromEntity = this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).getFromEntity(selectByPrimaryKey);
        LOGGER.debug("success to get sink info by id={}", num);
        return fromEntity;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public StreamSink get(Integer num, UserInfo userInfo) {
        if (userInfo == null) {
            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
        }
        if (num == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
        }
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(selectByPrimaryKey.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        if (userInfo.getRoles().contains(UserTypeEnum.ADMIN.name()) || Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName())) {
            return this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).getFromEntity(selectByPrimaryKey);
        }
        throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Integer getCount(String str, String str2) {
        Integer valueOf = Integer.valueOf(this.sinkMapper.selectCount(str, str2));
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{valueOf, str, str2});
        return valueOf;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<StreamSink> listSink(String str, String str2) {
        Preconditions.checkNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isEmpty(selectByRelatedId)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        selectByRelatedId.forEach(streamSinkEntity -> {
            arrayList.add(get(streamSinkEntity.getId()));
        });
        LOGGER.debug("success to list sink by groupId={}, streamId={}", str, str2);
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<SinkBriefInfo> listBrief(String str, String str2) {
        Preconditions.checkNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        List<SinkBriefInfo> selectSummary = this.sinkMapper.selectSummary(str, str2);
        LOGGER.debug("success to list sink summary by groupId=" + str + ", streamId=" + str2);
        return selectSummary;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> list) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        LOGGER.debug("begin to get sink map for groupId={}", inlongGroupId);
        Map<String, List<StreamSink>> map = (Map) listSink(inlongGroupId, null).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new)));
        LOGGER.debug("success to get sink map, size={}, groupInfo={}", Integer.valueOf(map.size()), inlongGroupInfo);
        return map;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest sinkPageRequest) {
        Preconditions.checkNotNull(sinkPageRequest.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        PageHelper.startPage(sinkPageRequest.getPageNum(), sinkPageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(sinkPageRequest);
        OrderTypeEnum.checkOrderType(sinkPageRequest);
        List<StreamSinkEntity> selectByCondition = this.sinkMapper.selectByCondition(sinkPageRequest);
        HashMap newHashMap = Maps.newHashMap();
        for (StreamSinkEntity streamSinkEntity : selectByCondition) {
            ((Page) newHashMap.computeIfAbsent(streamSinkEntity.getSinkType(), str -> {
                return new Page();
            })).add(streamSinkEntity);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            newArrayList.addAll(this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue()).getList());
        }
        PageResult<? extends StreamSink> pageResult = new PageResult<>(newArrayList);
        LOGGER.debug("success to list sink page, result size {}", Integer.valueOf(pageResult.getList().size()));
        return pageResult;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<? extends StreamSink> listByCondition(SinkPageRequest sinkPageRequest, UserInfo userInfo) {
        if (sinkPageRequest == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "group query request cannot be empty");
        }
        if (StringUtils.isBlank(sinkPageRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        if (userInfo == null) {
            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
        }
        List<StreamSinkEntity> selectByCondition = this.sinkMapper.selectByCondition(sinkPageRequest);
        HashMap newHashMap = Maps.newHashMap();
        for (StreamSinkEntity streamSinkEntity : selectByCondition) {
            ((Page) newHashMap.computeIfAbsent(streamSinkEntity.getSinkType(), str -> {
                return new Page();
            })).add(streamSinkEntity);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            for (StreamSink streamSink : this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue()).getList()) {
                InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(streamSink.getInlongGroupId());
                if (selectByGroupId != null && (userInfo.getRoles().contains(UserTypeEnum.ADMIN.name()) || Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName()))) {
                    newArrayList.add(streamSink);
                }
            }
        }
        return newArrayList;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to update sink by id: {}", sinkRequest);
        checkParams(sinkRequest);
        Preconditions.checkNotNull(sinkRequest.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        String inlongGroupId = sinkRequest.getInlongGroupId();
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkName = sinkRequest.getSinkName();
        this.groupCheckService.checkGroupStatus(inlongGroupId, str);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        Preconditions.checkNotNull(selectByIdentifier, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(inlongGroupId, inlongStreamId, sinkName);
        if (selectByUniqueKey != null && !selectByUniqueKey.getId().equals(sinkRequest.getId())) {
            throw new BusinessException(String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkName, inlongGroupId, inlongStreamId));
        }
        SinkStatus sinkStatus = null;
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        this.operatorFactory.getInstance(sinkRequest.getSinkType()).updateOpt(sinkRequest, sinkStatus, str);
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(inlongGroupId, inlongStreamId, str);
        }
        LOGGER.info("success to update sink by id: {}", sinkRequest);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(SinkRequest sinkRequest, UserInfo userInfo) {
        checkSinkRequestParams(sinkRequest);
        if (sinkRequest.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        if (userInfo == null) {
            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
        }
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(sinkRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", sinkRequest.getInlongGroupId()));
        }
        if (!userInfo.getRoles().contains(UserTypeEnum.ADMIN.name()) && !Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), sinkRequest.getSinkName());
        if (selectByUniqueKey != null && !selectByUniqueKey.getId().equals(sinkRequest.getId())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkRequest.getSinkName(), sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId()));
        }
        SinkStatus sinkStatus = null;
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        this.operatorFactory.getInstance(sinkRequest.getSinkType()).updateOpt(sinkRequest, sinkStatus, userInfo.getName());
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), userInfo.getName());
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public UpdateResult updateByKey(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to update sink by key: {}", sinkRequest);
        String inlongGroupId = sinkRequest.getInlongGroupId();
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkName = sinkRequest.getSinkName();
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(inlongGroupId, inlongStreamId, sinkName);
        if (selectByUniqueKey == null) {
            String format = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s", inlongGroupId, inlongStreamId, sinkName);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        sinkRequest.setId(selectByUniqueKey.getId());
        Boolean update = update(sinkRequest, str);
        LOGGER.info("success to update sink by key: {}", sinkRequest);
        return new UpdateResult(selectByUniqueKey.getId(), update, Integer.valueOf(sinkRequest.getVersion().intValue() + 1));
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public void updateStatus(Integer num, int i, String str) {
        StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
        streamSinkEntity.setId(num);
        streamSinkEntity.setStatus(Integer.valueOf(i));
        streamSinkEntity.setOperateLog(str);
        this.sinkMapper.updateStatus(streamSinkEntity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{Integer.valueOf(i), num, str});
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(Integer num, Boolean bool, String str) {
        LOGGER.info("begin to delete sink by id={}", num);
        Preconditions.checkNotNull(num, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        Preconditions.checkNotNull(selectByPrimaryKey, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        this.groupCheckService.checkGroupStatus(selectByPrimaryKey.getInlongGroupId(), str);
        this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).deleteOpt(selectByPrimaryKey, str);
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), str);
        }
        LOGGER.info("success to delete sink by id: {}", selectByPrimaryKey);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(Integer num, Boolean bool, UserInfo userInfo) {
        if (num == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        if (userInfo == null) {
            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
        }
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(selectByPrimaryKey.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByPrimaryKey.getInlongGroupId()));
        }
        if (!userInfo.getRoles().contains(UserTypeEnum.ADMIN.name()) && !Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).deleteOpt(selectByPrimaryKey, userInfo.getName());
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), userInfo.getName());
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean deleteByKey(String str, String str2, String str3, Boolean bool, String str4) {
        LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", new Object[]{str, str2, str3});
        Preconditions.checkNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(str3, "stream sink name is empty or null");
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(str, str2, str3);
        Preconditions.checkNotNull(selectByUniqueKey, String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s", str, str2, str3));
        this.groupCheckService.checkGroupStatus(selectByUniqueKey.getInlongGroupId(), str4);
        this.operatorFactory.getInstance(selectByUniqueKey.getSinkType()).deleteOpt(selectByUniqueKey, str4);
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByUniqueKey.getInlongGroupId(), selectByUniqueKey.getInlongStreamId(), str4);
        }
        LOGGER.info("success to delete sink by key: {}", selectByUniqueKey);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean logicDeleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", str, str2);
        Preconditions.checkNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        this.groupCheckService.checkGroupStatus(str, str3);
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            selectByRelatedId.forEach(streamSinkEntity -> {
                Integer id = streamSinkEntity.getId();
                streamSinkEntity.setPreviousStatus(streamSinkEntity.getStatus());
                streamSinkEntity.setStatus(InlongConstants.DELETED_STATUS);
                streamSinkEntity.setIsDeleted(id);
                streamSinkEntity.setModifier(str3);
                if (this.sinkMapper.updateByIdSelective(streamSinkEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                    LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{streamSinkEntity.getInlongGroupId(), streamSinkEntity.getInlongStreamId(), streamSinkEntity.getSinkName(), streamSinkEntity.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean deleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", str, str2);
        Preconditions.checkNotNull(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        this.groupCheckService.checkGroupStatus(str, str3);
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            selectByRelatedId.forEach(streamSinkEntity -> {
                this.sinkMapper.deleteById(streamSinkEntity.getId());
                this.sinkFieldMapper.deleteAll(streamSinkEntity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<String> getExistsStreamIdList(String str, String str2, List<String> list) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{str, str2, list});
        if (StringUtils.isEmpty(str2) || CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        List<String> selectExistsStreamId = this.sinkMapper.selectExistsStreamId(str, str2, list);
        LOGGER.debug("success to filter stream id list, result streamId={}", selectExistsStreamId);
        return selectExistsStreamId;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<String> getSinkTypeList(String str, String str2) {
        if (StringUtils.isEmpty(str2)) {
            return Collections.emptyList();
        }
        List<String> selectSinkType = this.sinkMapper.selectSinkType(str, str2);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{str, str2, selectSinkType});
        return selectSinkType;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Boolean updateAfterApprove(List<SinkApproveDTO> list, String str) {
        LOGGER.info("begin to update sink after approve: {}", list);
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        for (SinkApproveDTO sinkApproveDTO : list) {
            Preconditions.checkNotNull(sinkApproveDTO.getSinkType(), ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(sinkApproveDTO.getId());
            int intValue = (sinkApproveDTO.getStatus() == null ? SinkStatus.CONFIG_ING.getCode() : sinkApproveDTO.getStatus()).intValue();
            selectByPrimaryKey.setPreviousStatus(selectByPrimaryKey.getStatus());
            selectByPrimaryKey.setStatus(Integer.valueOf(intValue));
            selectByPrimaryKey.setModifier(str);
            if (this.sinkMapper.updateByIdSelective(selectByPrimaryKey) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), selectByPrimaryKey.getSinkName(), selectByPrimaryKey.getVersion()});
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        }
        LOGGER.info("success to update sink after approve: {}", list);
        return true;
    }

    private void checkSinkRequestParams(SinkRequest sinkRequest) {
        if (sinkRequest == null) {
            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
        }
        if (StringUtils.isBlank(sinkRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        if (StringUtils.isBlank(sinkRequest.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        }
        if (StringUtils.isBlank(sinkRequest.getSinkType())) {
            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_IS_NULL);
        }
        if (StringUtils.isBlank(sinkRequest.getSinkName())) {
            throw new BusinessException(ErrorCodeEnum.SINK_NAME_IS_NULL);
        }
    }

    private void checkParams(SinkRequest sinkRequest) {
        Preconditions.checkNotNull(sinkRequest, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(sinkRequest.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(sinkRequest.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull(sinkRequest.getSinkType(), ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
        Preconditions.checkNotNull(sinkRequest.getSinkName(), ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage());
    }

    private void startProcessForSink(String str, String str2, String str3) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean(this.streamProcessOperation);
        }
        this.streamProcessOperation.startProcess(str, str2, str3, false);
        LOGGER.info("success to start the start-stream-process for groupId={} streamId={}", str, str2);
    }

    private void deleteProcessForSink(String str, String str2, String str3) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean(this.streamProcessOperation);
        }
        this.streamProcessOperation.deleteProcess(str, str2, str3, false);
        LOGGER.info("success to start the delete-stream-process for groupId={} streamId={}", str, str2);
    }
}
