/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StreamSinkServiceImpl
implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
    @Autowired
    private SinkOperatorFactory operatorFactory;
    @Autowired
    private GroupCheckService groupCheckService;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongGroupEntityMapper groupMapper;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;
    @Autowired
    private UserService userService;
    @Autowired
    private ObjectMapper objectMapper;
    private InlongStreamProcessService streamProcessOperation;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(SinkRequest request, String operator) {
        LOGGER.info("begin to save sink info: {}", (Object)request);
        this.checkParams(request);
        String groupId = request.getInlongGroupId();
        this.groupCheckService.checkGroupStatus(groupId, operator);
        String streamId = request.getInlongStreamId();
        String sinkName = request.getSinkName();
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(groupId, streamId);
        Preconditions.expectNotNull((Object)streamEntity, (String)ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity exists = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        if (exists != null && exists.getSinkName().equals(sinkName)) {
            String err = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sinkName, groupId, streamId));
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        List fields = request.getSinkFieldList();
        if (CollectionUtils.isNotEmpty((Collection)fields)) {
            fields.forEach(sinkField -> sinkField.setId(null));
        }
        int id = sinkOperator.saveOpt(request, operator);
        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            SinkStatus nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
            StreamSinkEntity sinkEntity = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(id));
            sinkEntity.setStatus(nextStatus.getCode());
            this.sinkMapper.updateStatus(sinkEntity);
        }
        if (streamSuccess && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(groupId, streamId, operator);
        }
        LOGGER.info("success to save sink info: {}", (Object)request);
        return id;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(SinkRequest request, UserInfo opInfo) {
        this.checkSinkRequestParams(request);
        InlongGroupEntity entity = this.groupMapper.selectByGroupId(request.getInlongGroupId());
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", request.getInlongGroupId()));
        }
        this.userService.checkUser(entity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus curState = GroupStatus.forCode((int)entity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curState)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), curState));
        }
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
        if (streamEntity == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamSinkEntity exists = this.sinkMapper.selectByUniqueKey(request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
        if (exists != null && exists.getSinkName().equals(request.getSinkName())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        List fields = request.getSinkFieldList();
        if (CollectionUtils.isNotEmpty((Collection)fields)) {
            fields.forEach(sinkField -> sinkField.setId(null));
        }
        int id = sinkOperator.saveOpt(request, opInfo.getName());
        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            SinkStatus nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
            StreamSinkEntity sinkEntity = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(id));
            sinkEntity.setStatus(nextStatus.getCode());
            this.sinkMapper.updateStatus(sinkEntity);
        }
        if (streamSuccess && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), opInfo.getName());
        }
        return id;
    }

    @Override
    public StreamSink get(Integer id) {
        if (id == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
        }
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND, String.format("sink not found by id=%s", id));
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        return sinkOperator.getFromEntity(entity);
    }

    @Override
    public StreamSink get(Integer id, UserInfo opInfo) {
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        if (entity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(entity.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        return sinkOperator.getFromEntity(entity);
    }

    @Override
    public Integer getCount(String groupId, String streamId) {
        Integer count = this.sinkMapper.selectCount(groupId, streamId);
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{count, groupId, streamId});
        return count;
    }

    @Override
    public List<StreamSink> listSink(String groupId, String streamId) {
        if (StringUtils.isBlank((CharSequence)groupId)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY, "groupId id is blank");
        }
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            return Collections.emptyList();
        }
        ArrayList<StreamSink> responseList = new ArrayList<StreamSink>();
        entityList.forEach(entity -> responseList.add(this.get(entity.getId())));
        return responseList;
    }

    @Override
    public List<SinkBriefInfo> listBrief(String groupId, String streamId) {
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        List summaryList = this.sinkMapper.selectSummary(groupId, streamId);
        LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
        return summaryList;
    }

    @Override
    public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos) {
        String groupId = groupInfo.getInlongGroupId();
        LOGGER.debug("begin to get sink map for groupId={}", (Object)groupId);
        List<StreamSink> streamSinks = this.listSink(groupId, null);
        Map result = streamSinks.stream().collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        LOGGER.debug("success to get sink map, size={}, groupInfo={}", (Object)result.size(), (Object)groupInfo);
        return result;
    }

    @Override
    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest request) {
        Preconditions.expectNotBlank((String)request.getInlongGroupId(), (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        List entityPage = this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (Object streamSink : entityPage) {
            sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page()).add(streamSink);
        }
        ArrayList responseList = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            StreamSinkOperator sinkOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSink> pageInfo = sinkOperator.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            responseList.addAll(pageInfo.getList());
        }
        PageResult pageResult = new PageResult((List)responseList);
        LOGGER.debug("success to list sink page, result size {}", (Object)pageResult.getList().size());
        return pageResult;
    }

    @Override
    public List<? extends StreamSink> listByCondition(SinkPageRequest request, UserInfo opInfo) {
        if (StringUtils.isBlank((CharSequence)request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        OrderFieldEnum.checkOrderField((PageRequest)request);
        OrderTypeEnum.checkOrderType((PageRequest)request);
        List sinkEntityList = this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (StreamSinkEntity streamSink : sinkEntityList) {
            sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new Page()).add((Object)streamSink);
        }
        ArrayList filterResult = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            StreamSinkOperator sinkOperator = this.operatorFactory.getInstance((String)entry.getKey());
            PageResult<? extends StreamSink> pageInfo = sinkOperator.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            for (StreamSink streamSink : pageInfo.getList()) {
                List<String> inCharges;
                InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(streamSink.getInlongGroupId());
                if (groupEntity == null || !opInfo.getAccountType().equals(UserTypeEnum.ADMIN.getCode()) && !(inCharges = Arrays.asList(groupEntity.getInCharges().split(","))).contains(opInfo.getName())) continue;
                filterResult.add(streamSink);
            }
        }
        return filterResult;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(SinkRequest request, String operator) {
        LOGGER.info("begin to update sink by id: {}", (Object)request);
        if (request == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "inlong sink request is empty");
        }
        if (request.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        StreamSinkEntity curEntity = this.sinkMapper.selectByPrimaryKey(request.getId());
        if (curEntity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        this.chkUnmodifiableParams(curEntity, request);
        this.groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
        Preconditions.expectNotNull((Object)streamEntity, (String)ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity existEntity = this.sinkMapper.selectByUniqueKey(request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
        if (existEntity != null && !existEntity.getId().equals(request.getId())) {
            String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(errMsg, request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
        }
        SinkStatus nextStatus = null;
        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        sinkOperator.updateOpt(request, nextStatus, operator);
        if (streamSuccess && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
        }
        LOGGER.info("success to update sink by id: {}", (Object)request);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(SinkRequest request, UserInfo opInfo) {
        if (request.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        StreamSinkEntity curEntity = this.sinkMapper.selectByPrimaryKey(request.getId());
        if (curEntity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        this.chkUnmodifiableParams(curEntity, request);
        InlongGroupEntity curGroupEntity = this.groupMapper.selectByGroupId(curEntity.getInlongGroupId());
        if (curGroupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("InlongGroup does not exist with InlongGroupId=%s", curEntity.getInlongGroupId()));
        }
        this.userService.checkUser(curGroupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus curState = GroupStatus.forCode((int)curEntity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curState)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), curState));
        }
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
        if (streamEntity == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("stream record not found with the groupId=%s streamId=%s", curEntity.getInlongGroupId(), curEntity.getInlongStreamId()));
        }
        StreamSinkEntity existEntity = this.sinkMapper.selectByUniqueKey(request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
        if (existEntity != null && !existEntity.getId().equals(request.getId())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
        }
        SinkStatus nextStatus = null;
        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
            boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(request.getEnableCreateResource());
            nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(request.getSinkType());
        sinkOperator.updateOpt(request, nextStatus, opInfo.getName());
        if (streamSuccess && request.getStartProcess().booleanValue()) {
            this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), opInfo.getName());
        }
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public UpdateResult updateByKey(SinkRequest request, String operator) {
        LOGGER.info("begin to update sink by key: {}", (Object)request);
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkName = request.getSinkName();
        StreamSinkEntity entity = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        if (entity == null) {
            String errMsg = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s", groupId, streamId, sinkName);
            LOGGER.error(errMsg);
            throw new BusinessException(errMsg);
        }
        request.setId(entity.getId());
        Boolean result = this.update(request, operator);
        LOGGER.info("success to update sink by key: {}", (Object)request);
        return new UpdateResult(entity.getId(), result, Integer.valueOf(request.getVersion() + 1));
    }

    @Override
    public void updateStatus(Integer id, int status, String log) {
        StreamSinkEntity entity = new StreamSinkEntity();
        entity.setId(id);
        entity.setStatus(Integer.valueOf(status));
        entity.setOperateLog(log);
        this.sinkMapper.updateStatus(entity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{status, id, log});
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, Boolean startProcess, String operator) {
        LOGGER.info("begin to delete sink by id={}", (Object)id);
        Preconditions.expectNotNull((Object)id, (String)ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        this.groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        sinkOperator.deleteOpt(entity, operator);
        if (startProcess.booleanValue()) {
            this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator);
        }
        LOGGER.info("success to delete sink by id: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo) {
        StreamSinkEntity sinkEntity = this.sinkMapper.selectByPrimaryKey(id);
        if (sinkEntity == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        InlongGroupEntity groupEntity = this.groupMapper.selectByGroupId(sinkEntity.getInlongGroupId());
        if (groupEntity == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", sinkEntity.getInlongGroupId()));
        }
        this.userService.checkUser(groupEntity.getInCharges(), opInfo.getName(), ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
        GroupStatus curState = GroupStatus.forCode((int)groupEntity.getStatus());
        if (GroupStatus.notAllowedUpdate((GroupStatus)curState)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), curState));
        }
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(sinkEntity.getSinkType());
        sinkOperator.deleteOpt(sinkEntity, opInfo.getName());
        if (startProcess.booleanValue()) {
            this.deleteProcessForSink(sinkEntity.getInlongGroupId(), sinkEntity.getInlongStreamId(), opInfo.getName());
        }
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteByKey(String groupId, String streamId, String sinkName, Boolean startProcess, String operator) {
        LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", new Object[]{groupId, streamId, sinkName});
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)sinkName, (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"stream sink name is empty or null");
        StreamSinkEntity entity = this.sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
        Preconditions.expectNotNull((Object)entity, (String)String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s", groupId, streamId, sinkName));
        this.groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(entity.getSinkType());
        sinkOperator.deleteOpt(entity, operator);
        if (startProcess.booleanValue()) {
            this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator);
        }
        LOGGER.info("success to delete sink by key: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                Integer id = entity.getId();
                entity.setPreviousStatus(entity.getStatus());
                entity.setStatus(InlongConstants.DELETED_STATUS);
                entity.setIsDeleted(id);
                entity.setModifier(operator);
                int rowCount = this.sinkMapper.updateByIdSelective(entity);
                if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                    LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                this.sinkMapper.deleteById(entity.getId());
                this.sinkFieldMapper.deleteAll(entity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    public List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{groupId, sinkType, streamIdList});
        if (StringUtils.isEmpty((CharSequence)sinkType) || CollectionUtils.isEmpty(streamIdList)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectExistsStreamId(groupId, sinkType, streamIdList);
        LOGGER.debug("success to filter stream id list, result streamId={}", (Object)resultList);
        return resultList;
    }

    @Override
    public List<String> getSinkTypeList(String groupId, String streamId) {
        if (StringUtils.isEmpty((CharSequence)streamId)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectSinkType(groupId, streamId);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{groupId, streamId, resultList});
        return resultList;
    }

    @Override
    public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
        LOGGER.info("begin to update sink after approve: {}", approveList);
        if (CollectionUtils.isEmpty(approveList)) {
            return true;
        }
        for (SinkApproveDTO dto : approveList) {
            String sinkType = dto.getSinkType();
            Preconditions.expectNotBlank((String)sinkType, (ErrorCodeEnum)ErrorCodeEnum.SINK_TYPE_IS_NULL);
            StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(dto.getId());
            int status = dto.getStatus() == null ? SinkStatus.CONFIG_ING.getCode() : dto.getStatus();
            entity.setPreviousStatus(entity.getStatus());
            entity.setStatus(Integer.valueOf(status));
            entity.setModifier(operator);
            int rowCount = this.sinkMapper.updateByIdSelective(entity);
            if (rowCount == InlongConstants.AFFECTED_ONE_ROW) continue;
            LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to update sink after approve: {}", approveList);
        return true;
    }

    @Override
    public List<SinkField> parseFields(String fieldsJson) {
        try {
            Map fieldsMap = (Map)this.objectMapper.readValue(fieldsJson, (TypeReference)new TypeReference<Map<String, String>>(){});
            return fieldsMap.keySet().stream().map(fieldName -> {
                SinkField field = new SinkField();
                field.setFieldName(fieldName);
                field.setFieldType((String)fieldsMap.get(fieldName));
                return field;
            }).collect(Collectors.toList());
        }
        catch (Exception e) {
            LOGGER.error("parse sink fields error", (Throwable)e);
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("parse sink fields error : %s", e.getMessage()));
        }
    }

    private void checkSinkRequestParams(SinkRequest request) {
        String groupId = request.getInlongGroupId();
        if (StringUtils.isBlank((CharSequence)groupId)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        String streamId = request.getInlongStreamId();
        if (StringUtils.isBlank((CharSequence)streamId)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        }
        String sinkType = request.getSinkType();
        if (StringUtils.isBlank((CharSequence)sinkType)) {
            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_IS_NULL);
        }
        String sinkName = request.getSinkName();
        if (StringUtils.isBlank((CharSequence)sinkName)) {
            throw new BusinessException(ErrorCodeEnum.SINK_NAME_IS_NULL);
        }
    }

    private void checkParams(SinkRequest request) {
        Preconditions.expectNotNull((Object)request, (String)ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        String groupId = request.getInlongGroupId();
        Preconditions.expectNotBlank((String)groupId, (ErrorCodeEnum)ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        String streamId = request.getInlongStreamId();
        Preconditions.expectNotBlank((String)streamId, (ErrorCodeEnum)ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        String sinkType = request.getSinkType();
        Preconditions.expectNotBlank((String)sinkType, (ErrorCodeEnum)ErrorCodeEnum.SINK_TYPE_IS_NULL);
        String sinkName = request.getSinkName();
        Preconditions.expectNotBlank((String)sinkName, (ErrorCodeEnum)ErrorCodeEnum.SINK_NAME_IS_NULL);
    }

    private void startProcessForSink(String groupId, String streamId, String operator) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean((Object)this.streamProcessOperation);
        }
        this.streamProcessOperation.startProcess(groupId, streamId, operator, false);
        LOGGER.info("success to start the start-stream-process for groupId={} streamId={}", (Object)groupId, (Object)streamId);
    }

    private void deleteProcessForSink(String groupId, String streamId, String operator) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean((Object)this.streamProcessOperation);
        }
        this.streamProcessOperation.deleteProcess(groupId, streamId, operator, false);
        LOGGER.debug("success to start the delete-stream-process for groupId={} streamId={}", (Object)groupId, (Object)streamId);
    }

    private void chkUnmodifiableParams(StreamSinkEntity curEntity, SinkRequest request) {
        Preconditions.expectEquals((Object)curEntity.getSinkType(), (Object)request.getSinkType(), (ErrorCodeEnum)ErrorCodeEnum.INVALID_PARAMETER, (String)"sinkType not allowed modify");
        Preconditions.expectEquals((Object)curEntity.getVersion(), (Object)request.getVersion(), (ErrorCodeEnum)ErrorCodeEnum.CONFIG_EXPIRED, (String)String.format("record has expired with record version=%d, request version=%d", curEntity.getVersion(), request.getVersion()));
        if (StringUtils.isNotBlank((CharSequence)request.getInlongGroupId()) && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongGroupId not allowed modify");
        }
        if (StringUtils.isNotBlank((CharSequence)request.getInlongStreamId()) && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongStreamId not allowed modify");
        }
        request.setInlongGroupId(curEntity.getInlongGroupId());
        request.setInlongStreamId(curEntity.getInlongStreamId());
    }
}

