package org.apache.inlong.manager.service.sink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.class */
public class StreamSinkServiceImpl implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
    private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]");
    private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
    private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;

    @Autowired
    private SinkOperatorFactory operatorFactory;

    @Autowired
    private GroupCheckService groupCheckService;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private StreamSinkEntityMapper sinkMapper;

    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;

    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Autowired
    private ObjectMapper objectMapper;
    private InlongStreamProcessService streamProcessOperation;

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to save sink info: {}", sinkRequest);
        checkParams(sinkRequest);
        String inlongGroupId = sinkRequest.getInlongGroupId();
        this.groupCheckService.checkGroupStatus(inlongGroupId, str);
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkName = sinkRequest.getSinkName();
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        Preconditions.expectNotNull(selectByIdentifier, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(inlongGroupId, inlongStreamId, sinkName);
        if (selectByUniqueKey != null && selectByUniqueKey.getSinkName().equals(sinkName)) {
            throw new BusinessException(String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkName, inlongGroupId, inlongStreamId));
        }
        StreamSinkOperator sinkOperatorFactory = this.operatorFactory.getInstance(sinkRequest.getSinkType());
        List sinkFieldList = sinkRequest.getSinkFieldList();
        if (CollectionUtils.isNotEmpty(sinkFieldList)) {
            sinkFieldList.forEach(sinkField -> {
                sinkField.setId((Integer) null);
            });
        }
        int intValue = sinkOperatorFactory.saveOpt(sinkRequest, str).intValue();
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            boolean equals2 = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource());
            SinkStatus sinkStatus = sinkRequest.getStartProcess().booleanValue() ? SinkStatus.CONFIG_ING : SinkStatus.NEW;
            if (!equals2) {
                sinkStatus = SinkStatus.CONFIG_SUCCESSFUL;
            }
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(intValue));
            selectByPrimaryKey.setStatus(sinkStatus.getCode());
            this.sinkMapper.updateStatus(selectByPrimaryKey);
        }
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(inlongGroupId, inlongStreamId, str);
        }
        LOGGER.info("success to save sink info: {}", sinkRequest);
        return Integer.valueOf(intValue);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(SinkRequest sinkRequest, UserInfo userInfo) {
        checkSinkRequestParams(sinkRequest);
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(sinkRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", sinkRequest.getInlongGroupId()));
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), sinkRequest.getSinkName());
        if (selectByUniqueKey != null && selectByUniqueKey.getSinkName().equals(sinkRequest.getSinkName())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkRequest.getSinkName(), sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId()));
        }
        StreamSinkOperator sinkOperatorFactory = this.operatorFactory.getInstance(sinkRequest.getSinkType());
        List sinkFieldList = sinkRequest.getSinkFieldList();
        if (CollectionUtils.isNotEmpty(sinkFieldList)) {
            sinkFieldList.forEach(sinkField -> {
                sinkField.setId((Integer) null);
            });
        }
        int intValue = sinkOperatorFactory.saveOpt(sinkRequest, userInfo.getName()).intValue();
        boolean equals = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus());
        if (equals || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus())) {
            boolean equals2 = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource());
            SinkStatus sinkStatus = sinkRequest.getStartProcess().booleanValue() ? SinkStatus.CONFIG_ING : SinkStatus.NEW;
            if (!equals2) {
                sinkStatus = SinkStatus.CONFIG_SUCCESSFUL;
            }
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(Integer.valueOf(intValue));
            selectByPrimaryKey.setStatus(sinkStatus.getCode());
            this.sinkMapper.updateStatus(selectByPrimaryKey);
        }
        if (equals && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), userInfo.getName());
        }
        return Integer.valueOf(intValue);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public StreamSink get(Integer num) {
        if (num == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
        }
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND, String.format("sink not found by id=%s", num));
        }
        return this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).getFromEntity(selectByPrimaryKey);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public StreamSink get(Integer num, UserInfo userInfo) {
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        if (this.groupMapper.selectByGroupId(selectByPrimaryKey.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        return this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).getFromEntity(selectByPrimaryKey);
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Integer getCount(String str, String str2) {
        Integer valueOf = Integer.valueOf(this.sinkMapper.selectCount(str, str2));
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{valueOf, str, str2});
        return valueOf;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<StreamSink> listSink(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY, "groupId id is blank");
        }
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isEmpty(selectByRelatedId)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        selectByRelatedId.forEach(streamSinkEntity -> {
            arrayList.add(get(streamSinkEntity.getId()));
        });
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<SinkBriefInfo> listBrief(String str, String str2) {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        List<SinkBriefInfo> selectSummary = this.sinkMapper.selectSummary(str, str2);
        LOGGER.debug("success to list sink summary by groupId={}, streamId={}", str, str2);
        return selectSummary;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> list) {
        String inlongGroupId = inlongGroupInfo.getInlongGroupId();
        LOGGER.debug("begin to get sink map for groupId={}", inlongGroupId);
        Map<String, List<StreamSink>> map = (Map) listSink(inlongGroupId, null).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new)));
        LOGGER.debug("success to get sink map, size={}, groupInfo={}", Integer.valueOf(map.size()), inlongGroupInfo);
        return map;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest sinkPageRequest, String str) {
        Preconditions.expectNotBlank(sinkPageRequest.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        PageHelper.startPage(sinkPageRequest.getPageNum(), sinkPageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(sinkPageRequest);
        OrderTypeEnum.checkOrderType(sinkPageRequest);
        Page selectByCondition = this.sinkMapper.selectByCondition(sinkPageRequest);
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = selectByCondition.iterator();
        while (it.hasNext()) {
            StreamSinkEntity streamSinkEntity = (StreamSinkEntity) it.next();
            if (this.groupMapper.selectByGroupId(streamSinkEntity.getInlongGroupId()) != null) {
                ((Page) newHashMap.computeIfAbsent(streamSinkEntity.getSinkType(), str2 -> {
                    return new Page();
                })).add(streamSinkEntity);
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            newArrayList.addAll(this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue()).getList());
        }
        PageResult<? extends StreamSink> pageResult = new PageResult<>(newArrayList, Long.valueOf(selectByCondition.getTotal()), Integer.valueOf(selectByCondition.getPageNum()), Integer.valueOf(selectByCondition.getPageSize()));
        LOGGER.debug("success to list sink page, result size {}", Integer.valueOf(pageResult.getList().size()));
        return pageResult;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<? extends StreamSink> listByCondition(SinkPageRequest sinkPageRequest, UserInfo userInfo) {
        if (StringUtils.isBlank(sinkPageRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        OrderFieldEnum.checkOrderField(sinkPageRequest);
        OrderTypeEnum.checkOrderType(sinkPageRequest);
        List<StreamSinkEntity> selectByCondition = this.sinkMapper.selectByCondition(sinkPageRequest);
        HashMap newHashMap = Maps.newHashMap();
        for (StreamSinkEntity streamSinkEntity : selectByCondition) {
            ((Page) newHashMap.computeIfAbsent(streamSinkEntity.getSinkType(), str -> {
                return new Page();
            })).add(streamSinkEntity);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : newHashMap.entrySet()) {
            for (StreamSink streamSink : this.operatorFactory.getInstance((String) entry.getKey()).getPageInfo((Page) entry.getValue()).getList()) {
                InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(streamSink.getInlongGroupId());
                if (selectByGroupId != null && (userInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode()) || Arrays.asList(selectByGroupId.getInCharges().split(",")).contains(userInfo.getName()))) {
                    newArrayList.add(streamSink);
                }
            }
        }
        return newArrayList;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to update sink by id: {}", sinkRequest);
        if (sinkRequest == null) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "inlong sink request is empty");
        }
        if (sinkRequest.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(sinkRequest.getId());
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        chkUnmodifiableParams(selectByPrimaryKey, sinkRequest);
        this.groupCheckService.checkGroupStatus(sinkRequest.getInlongGroupId(), str);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
        Preconditions.expectNotNull(selectByIdentifier, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), sinkRequest.getSinkName());
        if (selectByUniqueKey != null && !selectByUniqueKey.getId().equals(sinkRequest.getId())) {
            throw new BusinessException(String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkRequest.getSinkName(), sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId()));
        }
        SinkStatus sinkStatus = null;
        boolean z = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus()) || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus());
        if (z) {
            sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        this.operatorFactory.getInstance(sinkRequest.getSinkType()).updateOpt(sinkRequest, sinkStatus, str);
        if (z && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), str);
        }
        LOGGER.info("success to update sink by id: {}", sinkRequest);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(SinkRequest sinkRequest, UserInfo userInfo) {
        if (sinkRequest.getId() == null) {
            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
        }
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(sinkRequest.getId());
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        chkUnmodifiableParams(selectByPrimaryKey, sinkRequest);
        if (this.groupMapper.selectByGroupId(selectByPrimaryKey.getInlongGroupId()) == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByPrimaryKey.getInlongGroupId()));
        }
        GroupStatus forCode = GroupStatus.forCode(selectByPrimaryKey.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE, String.format("stream record not found with the groupId=%s streamId=%s", selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId()));
        }
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), sinkRequest.getSinkName());
        if (selectByUniqueKey != null && !selectByUniqueKey.getId().equals(sinkRequest.getId())) {
            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE, String.format("sink name=%s already exists with the groupId=%s streamId=%s", sinkRequest.getSinkName(), sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId()));
        }
        SinkStatus sinkStatus = null;
        boolean z = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(selectByIdentifier.getStatus()) || StreamStatus.CONFIG_FAILED.getCode().equals(selectByIdentifier.getStatus());
        if (z) {
            sinkStatus = InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkRequest.getEnableCreateResource()) ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
        }
        this.operatorFactory.getInstance(sinkRequest.getSinkType()).updateOpt(sinkRequest, sinkStatus, userInfo.getName());
        if (z && sinkRequest.getStartProcess().booleanValue()) {
            startProcessForSink(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId(), userInfo.getName());
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public UpdateResult updateByKey(SinkRequest sinkRequest, String str) {
        LOGGER.info("begin to update sink by key: {}", sinkRequest);
        String inlongGroupId = sinkRequest.getInlongGroupId();
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkName = sinkRequest.getSinkName();
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(inlongGroupId, inlongStreamId, sinkName);
        if (selectByUniqueKey == null) {
            String format = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s", inlongGroupId, inlongStreamId, sinkName);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        sinkRequest.setId(selectByUniqueKey.getId());
        Boolean update = update(sinkRequest, str);
        LOGGER.info("success to update sink by key: {}", sinkRequest);
        return new UpdateResult(selectByUniqueKey.getId(), update, Integer.valueOf(sinkRequest.getVersion().intValue() + 1));
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public void updateStatus(Integer num, int i, String str) {
        StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
        streamSinkEntity.setId(num);
        streamSinkEntity.setStatus(Integer.valueOf(i));
        streamSinkEntity.setOperateLog(str);
        this.sinkMapper.updateStatus(streamSinkEntity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{Integer.valueOf(i), num, str});
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(Integer num, Boolean bool, String str) {
        LOGGER.info("begin to delete sink by id={}", num);
        Preconditions.expectNotNull(num, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        Preconditions.expectNotNull(selectByPrimaryKey, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        this.groupCheckService.checkGroupStatus(selectByPrimaryKey.getInlongGroupId(), str);
        this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).deleteOpt(selectByPrimaryKey, str);
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), str);
        }
        LOGGER.info("success to delete sink by id: {}", selectByPrimaryKey);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(Integer num, Boolean bool, UserInfo userInfo) {
        StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(num);
        if (selectByPrimaryKey == null) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(selectByPrimaryKey.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, String.format("InlongGroup does not exist with InlongGroupId=%s", selectByPrimaryKey.getInlongGroupId()));
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.notAllowedUpdate(forCode)) {
            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), forCode));
        }
        this.operatorFactory.getInstance(selectByPrimaryKey.getSinkType()).deleteOpt(selectByPrimaryKey, userInfo.getName());
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), userInfo.getName());
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean deleteByKey(String str, String str2, String str3, Boolean bool, String str4) {
        LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", new Object[]{str, str2, str3});
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str3, ErrorCodeEnum.INVALID_PARAMETER, "stream sink name is empty or null");
        StreamSinkEntity selectByUniqueKey = this.sinkMapper.selectByUniqueKey(str, str2, str3);
        Preconditions.expectNotNull(selectByUniqueKey, String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s", str, str2, str3));
        this.groupCheckService.checkGroupStatus(selectByUniqueKey.getInlongGroupId(), str4);
        this.operatorFactory.getInstance(selectByUniqueKey.getSinkType()).deleteOpt(selectByUniqueKey, str4);
        if (bool.booleanValue()) {
            deleteProcessForSink(selectByUniqueKey.getInlongGroupId(), selectByUniqueKey.getInlongStreamId(), str4);
        }
        LOGGER.info("success to delete sink by key: {}", selectByUniqueKey);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean logicDeleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(str, str3);
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            selectByRelatedId.forEach(streamSinkEntity -> {
                Integer id = streamSinkEntity.getId();
                streamSinkEntity.setPreviousStatus(streamSinkEntity.getStatus());
                streamSinkEntity.setStatus(InlongConstants.DELETED_STATUS);
                streamSinkEntity.setIsDeleted(id);
                streamSinkEntity.setModifier(str3);
                if (this.sinkMapper.updateByIdSelective(streamSinkEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                    LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{streamSinkEntity.getInlongGroupId(), streamSinkEntity.getInlongStreamId(), streamSinkEntity.getSinkName(), streamSinkEntity.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean deleteAll(String str, String str2, String str3) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        this.groupCheckService.checkGroupStatus(str, str3);
        List selectByRelatedId = this.sinkMapper.selectByRelatedId(str, str2);
        if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            selectByRelatedId.forEach(streamSinkEntity -> {
                this.sinkMapper.deleteById(streamSinkEntity.getId());
                this.sinkFieldMapper.deleteAll(streamSinkEntity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<String> getExistsStreamIdList(String str, String str2, List<String> list) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{str, str2, list});
        if (StringUtils.isEmpty(str2) || CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        List<String> selectExistsStreamId = this.sinkMapper.selectExistsStreamId(str, str2, list);
        LOGGER.debug("success to filter stream id list, result streamId={}", selectExistsStreamId);
        return selectExistsStreamId;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<String> getSinkTypeList(String str, String str2) {
        if (StringUtils.isEmpty(str2)) {
            return Collections.emptyList();
        }
        List<String> selectSinkType = this.sinkMapper.selectSinkType(str, str2);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{str, str2, selectSinkType});
        return selectSinkType;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public Boolean updateAfterApprove(List<SinkApproveDTO> list, String str) {
        LOGGER.info("begin to update sink after approve: {}", list);
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        for (SinkApproveDTO sinkApproveDTO : list) {
            Preconditions.expectNotBlank(sinkApproveDTO.getSinkType(), ErrorCodeEnum.SINK_TYPE_IS_NULL);
            StreamSinkEntity selectByPrimaryKey = this.sinkMapper.selectByPrimaryKey(sinkApproveDTO.getId());
            int intValue = (sinkApproveDTO.getStatus() == null ? SinkStatus.CONFIG_ING.getCode() : sinkApproveDTO.getStatus()).intValue();
            selectByPrimaryKey.setPreviousStatus(selectByPrimaryKey.getStatus());
            selectByPrimaryKey.setStatus(Integer.valueOf(intValue));
            selectByPrimaryKey.setModifier(str);
            if (this.sinkMapper.updateByIdSelective(selectByPrimaryKey) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", new Object[]{selectByPrimaryKey.getInlongGroupId(), selectByPrimaryKey.getInlongStreamId(), selectByPrimaryKey.getSinkName(), selectByPrimaryKey.getVersion()});
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
        }
        LOGGER.info("success to update sink after approve: {}", list);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public boolean addFields(StreamSinkEntity streamSinkEntity, List<SinkField> list) {
        Set set = (Set) this.sinkFieldMapper.selectBySinkId(streamSinkEntity.getId()).stream().map((v0) -> {
            return v0.getFieldName();
        }).collect(Collectors.toSet());
        LOGGER.debug("begin to save sink fields={}", list);
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        ArrayList arrayList = new ArrayList();
        for (SinkField sinkField : list) {
            if (set.contains(sinkField.getFieldName())) {
                LOGGER.debug("current sink field={} is exist for groupId={}, streamId={}", new Object[]{sinkField.getFieldName(), streamSinkEntity.getInlongGroupId(), streamSinkEntity.getInlongStreamId()});
            } else {
                StreamSinkFieldEntity streamSinkFieldEntity = (StreamSinkFieldEntity) CommonBeanUtils.copyProperties(sinkField, StreamSinkFieldEntity::new);
                if (StringUtils.isEmpty(streamSinkFieldEntity.getFieldComment())) {
                    streamSinkFieldEntity.setFieldComment(streamSinkFieldEntity.getFieldName());
                }
                streamSinkFieldEntity.setInlongGroupId(streamSinkEntity.getInlongGroupId());
                streamSinkFieldEntity.setInlongStreamId(streamSinkEntity.getInlongStreamId());
                streamSinkFieldEntity.setSinkType(streamSinkEntity.getSinkType());
                streamSinkFieldEntity.setSinkId(streamSinkEntity.getId());
                streamSinkFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
                arrayList.add(streamSinkFieldEntity);
            }
        }
        if (CollectionUtils.isNotEmpty(arrayList)) {
            this.sinkFieldMapper.insertAll(arrayList);
        }
        LOGGER.debug("success to save sink fields={}", arrayList);
        return true;
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkService
    public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
        try {
            String method = parseFieldRequest.getMethod();
            String statement = parseFieldRequest.getStatement();
            boolean z = -1;
            switch (method.hashCode()) {
                case 98822:
                    if (method.equals("csv")) {
                        z = PARSE_FIELD_CSV_MIN_COLUMNS;
                        break;
                    }
                    break;
                case 114126:
                    if (method.equals("sql")) {
                        z = true;
                        break;
                    }
                    break;
                case 3271912:
                    if (method.equals("json")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return parseFieldsByJson(statement);
                case true:
                    return parseFieldsBySql(statement);
                case PARSE_FIELD_CSV_MIN_COLUMNS /* 2 */:
                    return parseFieldsByCsv(statement);
                default:
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("Unsupported parse mode: %s", method));
            }
        } catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("parse sink fields error: %s", e.getMessage()));
        }
    }

    private List<SinkField> parseFieldsByCsv(String str) {
        String[] split = str.split("\n");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < split.length; i++) {
            String str2 = split[i];
            if (!StringUtils.isBlank(str2)) {
                String[] split2 = PARSE_FIELD_CSV_SPLITTER.split(str2, PARSE_FIELD_CSV_MAX_COLUMNS);
                if (split2.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "At least two fields are required, line number is " + (i + 1));
                }
                String str3 = split2[0];
                if (!InlongConstants.PATTERN_NORMAL_CHARACTERS.matcher(str3).matches()) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) + " can only contain letters, underscores or numbers");
                }
                String str4 = split2[1];
                String str5 = split2.length == PARSE_FIELD_CSV_MAX_COLUMNS ? split2[PARSE_FIELD_CSV_MIN_COLUMNS] : null;
                SinkField sinkField = new SinkField();
                sinkField.setFieldName(str3);
                sinkField.setFieldType(str4);
                sinkField.setFieldComment(str5);
                arrayList.add(sinkField);
            }
        }
        return arrayList;
    }

    private List<SinkField> parseFieldsBySql(String str) throws JSQLParserException {
        CreateTable parse = new CCJSqlParserManager().parse(new StringReader(str));
        ArrayList arrayList = new ArrayList();
        if (!(parse instanceof CreateTable)) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The SQL statement must be a table creation statement");
        }
        for (ColumnDefinition columnDefinition : parse.getColumnDefinitions()) {
            String columnName = columnDefinition.getColumnName();
            String dataType = columnDefinition.getColDataType().getDataType();
            SinkField sinkField = new SinkField();
            sinkField.setFieldName(columnName);
            sinkField.setFieldType(StringUtils.substringBefore(dataType, "(").toLowerCase());
            List columnSpecs = columnDefinition.getColumnSpecs();
            if (CollectionUtils.isNotEmpty(columnSpecs)) {
                int i = -1;
                int i2 = 0;
                while (true) {
                    if (i2 >= columnSpecs.size()) {
                        break;
                    }
                    if (((String) columnSpecs.get(i2)).toUpperCase().startsWith("COMMENT")) {
                        i = i2;
                        break;
                    }
                    i2++;
                }
                String str2 = null;
                if (-1 != i && columnSpecs.size() > i + 1) {
                    str2 = ((String) columnSpecs.get(i + 1)).replaceAll("['\"]", "");
                }
                sinkField.setFieldComment(str2);
            }
            arrayList.add(sinkField);
        }
        return arrayList;
    }

    private List<SinkField> parseFieldsByJson(String str) throws JsonProcessingException {
        return (List) ((List) this.objectMapper.readValue(str, new TypeReference<List<Map<String, String>>>() { // from class: org.apache.inlong.manager.service.sink.StreamSinkServiceImpl.1
        })).stream().map(map -> {
            String str2 = (String) map.get("name");
            String str3 = (String) map.get("type");
            String str4 = (String) map.get("desc");
            SinkField sinkField = new SinkField();
            sinkField.setFieldName(str2);
            sinkField.setFieldType(str3);
            sinkField.setFieldComment(str4);
            return sinkField;
        }).collect(Collectors.toList());
    }

    private void checkSinkRequestParams(SinkRequest sinkRequest) {
        if (StringUtils.isBlank(sinkRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        }
        if (StringUtils.isBlank(sinkRequest.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        }
        if (StringUtils.isBlank(sinkRequest.getSinkType())) {
            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_IS_NULL);
        }
        if (StringUtils.isBlank(sinkRequest.getSinkName())) {
            throw new BusinessException(ErrorCodeEnum.SINK_NAME_IS_NULL);
        }
    }

    private void checkParams(SinkRequest sinkRequest) {
        Preconditions.expectNotNull(sinkRequest, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        Preconditions.expectNotBlank(sinkRequest.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(sinkRequest.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        Preconditions.expectNotBlank(sinkRequest.getSinkType(), ErrorCodeEnum.SINK_TYPE_IS_NULL);
        Preconditions.expectNotBlank(sinkRequest.getSinkName(), ErrorCodeEnum.SINK_NAME_IS_NULL);
    }

    private void startProcessForSink(String str, String str2, String str3) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean(this.streamProcessOperation);
        }
        this.streamProcessOperation.startProcess(str, str2, str3, false);
        LOGGER.info("success to start the start-stream-process for groupId={} streamId={}", str, str2);
    }

    private void deleteProcessForSink(String str, String str2, String str3) {
        if (this.streamProcessOperation == null) {
            this.streamProcessOperation = new InlongStreamProcessService();
            this.autowireCapableBeanFactory.autowireBean(this.streamProcessOperation);
        }
        this.streamProcessOperation.deleteProcess(str, str2, str3, false);
        LOGGER.debug("success to start the delete-stream-process for groupId={} streamId={}", str, str2);
    }

    private void chkUnmodifiableParams(StreamSinkEntity streamSinkEntity, SinkRequest sinkRequest) {
        Preconditions.expectEquals(streamSinkEntity.getSinkType(), sinkRequest.getSinkType(), ErrorCodeEnum.INVALID_PARAMETER, "sinkType not allowed modify");
        Preconditions.expectEquals(streamSinkEntity.getVersion(), sinkRequest.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED, String.format("record has expired with record version=%d, request version=%d", streamSinkEntity.getVersion(), sinkRequest.getVersion()));
        if (StringUtils.isNotBlank(sinkRequest.getInlongGroupId()) && !streamSinkEntity.getInlongGroupId().equals(sinkRequest.getInlongGroupId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongGroupId not allowed modify");
        }
        if (StringUtils.isNotBlank(sinkRequest.getInlongStreamId()) && !streamSinkEntity.getInlongStreamId().equals(sinkRequest.getInlongStreamId())) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "InlongStreamId not allowed modify");
        }
        sinkRequest.setInlongGroupId(streamSinkEntity.getInlongGroupId());
        sinkRequest.setInlongStreamId(streamSinkEntity.getInlongStreamId());
    }
}
