package org.apache.inlong.manager.service.stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.PageHelper;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.tool.excel.ExcelTool;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.group.InlongGroupOperatorFactory;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.class */
public class InlongStreamServiceImpl implements InlongStreamService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongStreamServiceImpl.class);
    private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
    private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
    private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongStreamFieldEntityMapper streamFieldMapper;

    @Autowired
    private InlongStreamExtEntityMapper streamExtMapper;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private StreamSourceService sourceService;

    @Autowired
    private StreamSinkService sinkService;

    @Autowired
    private StreamSinkEntityMapper sinkMapper;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    @Lazy
    private QueueResourceOperatorFactory queueOperatorFactory;

    @Autowired
    @Lazy
    private InlongGroupOperatorFactory groupOperatorFactory;

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Integer save(InlongStreamRequest inlongStreamRequest, String str) {
        LOGGER.debug("begin to save inlong stream info={}", inlongStreamRequest);
        Preconditions.expectNotNull(inlongStreamRequest, "inlong stream info is empty");
        String inlongGroupId = inlongStreamRequest.getInlongGroupId();
        String inlongStreamId = inlongStreamRequest.getInlongStreamId();
        Preconditions.expectNotBlank(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(inlongStreamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        checkGroupStatusIsTemp(inlongGroupId);
        if (this.streamMapper.selectExistByIdentifier(inlongGroupId, inlongStreamId).intValue() >= 1) {
            LOGGER.error("inlong stream id [{}] has already exists", inlongStreamId);
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
        }
        if (StringUtils.isEmpty(inlongStreamRequest.getMqResource())) {
            inlongStreamRequest.setMqResource(inlongStreamId);
        }
        InlongStreamEntity inlongStreamEntity = (InlongStreamEntity) CommonBeanUtils.copyProperties(inlongStreamRequest, InlongStreamEntity::new);
        inlongStreamEntity.setStatus(StreamStatus.NEW.getCode());
        inlongStreamEntity.setCreator(str);
        inlongStreamEntity.setModifier(str);
        inlongStreamEntity.setExtParams(InlongStreamExtParam.packExtParams(inlongStreamRequest));
        this.streamMapper.insertSelective(inlongStreamEntity);
        saveField(inlongGroupId, inlongStreamId, inlongStreamRequest.getFieldList());
        List<InlongStreamExtInfo> extList = inlongStreamRequest.getExtList();
        if (CollectionUtils.isNotEmpty(extList)) {
            saveOrUpdateExt(inlongGroupId, inlongStreamId, extList);
        }
        LOGGER.info("success to save inlong stream info for groupId={}", inlongGroupId);
        return inlongStreamEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public Integer save(InlongStreamRequest inlongStreamRequest, UserInfo userInfo) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongStreamRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.isTempStatus(forCode)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED, String.format("inlong groupId=%s status=%s was not allowed to add/update/delete stream", inlongStreamRequest.getInlongGroupId(), forCode));
        }
        if (this.streamMapper.selectExistByIdentifier(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId()).intValue() >= 1) {
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
        }
        if (StringUtils.isEmpty(inlongStreamRequest.getMqResource())) {
            inlongStreamRequest.setMqResource(inlongStreamRequest.getInlongStreamId());
        }
        inlongStreamRequest.setExtParams(InlongStreamExtParam.packExtParams(inlongStreamRequest));
        InlongStreamEntity inlongStreamEntity = (InlongStreamEntity) CommonBeanUtils.copyProperties(inlongStreamRequest, InlongStreamEntity::new);
        inlongStreamEntity.setStatus(StreamStatus.NEW.getCode());
        inlongStreamEntity.setCreator(userInfo.getName());
        inlongStreamEntity.setModifier(userInfo.getName());
        this.streamMapper.insertSelective(inlongStreamEntity);
        saveField(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId(), inlongStreamRequest.getFieldList());
        List<InlongStreamExtInfo> extList = inlongStreamRequest.getExtList();
        if (CollectionUtils.isNotEmpty(extList)) {
            saveOrUpdateExt(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId(), extList);
        }
        return inlongStreamEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public Boolean exist(String str, String str2) {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        return Boolean.valueOf(this.streamMapper.selectByIdentifier(str, str2) != null);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public InlongStreamInfo get(String str, String str2) {
        LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str, str2);
        if (selectByIdentifier == null) {
            LOGGER.error("inlong stream not found by groupId={}, streamId={}", str, str2);
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        InlongStreamInfo inlongStreamInfo = (InlongStreamInfo) CommonBeanUtils.copyProperties(selectByIdentifier, InlongStreamInfo::new);
        inlongStreamInfo.setFieldList(getStreamFields(str, str2));
        inlongStreamInfo.setExtList(CommonBeanUtils.copyListProperties(this.streamExtMapper.selectByRelatedId(str, str2), InlongStreamExtInfo::new));
        InlongStreamExtParam.unpackExtParams(selectByIdentifier.getExtParams(), inlongStreamInfo);
        inlongStreamInfo.setSinkList(this.sinkService.listSink(str, str2));
        inlongStreamInfo.setSourceList(this.sourceService.listSource(str, str2));
        return inlongStreamInfo;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public InlongStreamInfo get(String str, String str2, UserInfo userInfo) {
        if (this.groupMapper.selectByGroupId(str) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str, str2);
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        InlongStreamInfo inlongStreamInfo = (InlongStreamInfo) CommonBeanUtils.copyProperties(selectByIdentifier, InlongStreamInfo::new);
        InlongStreamExtParam.unpackExtParams(selectByIdentifier.getExtParams(), inlongStreamInfo);
        inlongStreamInfo.setFieldList(getStreamFields(str, str2));
        inlongStreamInfo.setExtList(CommonBeanUtils.copyListProperties(this.streamExtMapper.selectByRelatedId(str, str2), InlongStreamExtInfo::new));
        inlongStreamInfo.setSinkList(this.sinkService.listSink(str, str2));
        inlongStreamInfo.setSourceList(this.sourceService.listSource(str, str2));
        return inlongStreamInfo;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public InlongStreamBriefInfo getBrief(String str, String str2, String str3) {
        if (this.groupMapper.selectByGroupId(str) == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str, str2);
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        InlongStreamBriefInfo inlongStreamBriefInfo = (InlongStreamBriefInfo) CommonBeanUtils.copyProperties(selectByIdentifier, InlongStreamBriefInfo::new);
        InlongStreamExtParam.unpackExtParams(selectByIdentifier.getExtParams(), inlongStreamBriefInfo);
        inlongStreamBriefInfo.setFieldList(getStreamFields(str, str2));
        inlongStreamBriefInfo.setExtList(CommonBeanUtils.copyListProperties(this.streamExtMapper.selectByRelatedId(str, str2), InlongStreamExtInfo::new));
        return inlongStreamBriefInfo;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<InlongStreamInfo> list(String str) {
        LOGGER.debug("begin to list inlong streams by groupId={}", str);
        List<InlongStreamInfo> copyListProperties = CommonBeanUtils.copyListProperties(this.streamMapper.selectByGroupId(str), InlongStreamInfo::new);
        Map map = (Map) getStreamFields(str, null).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new)));
        Map map2 = (Map) this.streamExtMapper.selectByRelatedId(str, (String) null).stream().map(inlongStreamExtEntity -> {
            return (InlongStreamExtInfo) CommonBeanUtils.copyProperties(inlongStreamExtEntity, InlongStreamExtInfo::new);
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new)));
        copyListProperties.forEach(inlongStreamInfo -> {
            String inlongStreamId = inlongStreamInfo.getInlongStreamId();
            InlongStreamExtParam.unpackExtParams(inlongStreamInfo.getExtParams(), inlongStreamInfo);
            inlongStreamInfo.setFieldList((List) map.get(inlongStreamId));
            inlongStreamInfo.setExtList((List) map2.get(inlongStreamId));
            inlongStreamInfo.setSinkList(this.sinkService.listSink(str, inlongStreamId));
            inlongStreamInfo.setSourceList(this.sourceService.listSource(str, inlongStreamId));
        });
        return copyListProperties;
    }

    private List<StreamField> getStreamFields(String str, String str2) {
        List selectByIdentifier = this.streamFieldMapper.selectByIdentifier(str, str2);
        return CollectionUtils.isEmpty(selectByIdentifier) ? Collections.emptyList() : CommonBeanUtils.copyListProperties(selectByIdentifier, StreamField::new);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public PageResult<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest inlongStreamPageRequest) {
        LOGGER.debug("begin to list inlong stream page by {}", inlongStreamPageRequest);
        PageHelper.startPage(inlongStreamPageRequest.getPageNum(), inlongStreamPageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(inlongStreamPageRequest);
        OrderTypeEnum.checkOrderType(inlongStreamPageRequest);
        PageResult<InlongStreamBriefInfo> map = PageResult.fromPage(this.streamMapper.selectByCondition(inlongStreamPageRequest)).map(inlongStreamEntity -> {
            return (InlongStreamBriefInfo) CommonBeanUtils.copyProperties(inlongStreamEntity, InlongStreamBriefInfo::new);
        });
        LOGGER.debug("success to list inlong stream info for groupId={}", inlongStreamPageRequest.getInlongGroupId());
        return map;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest inlongStreamPageRequest, UserInfo userInfo) {
        inlongStreamPageRequest.setCurrentUser(userInfo.getName());
        inlongStreamPageRequest.setIsAdminRole(Boolean.valueOf(userInfo.getRoles().contains("TENANT_ADMIN")));
        OrderFieldEnum.checkOrderField(inlongStreamPageRequest);
        OrderTypeEnum.checkOrderType(inlongStreamPageRequest);
        return CommonBeanUtils.copyListProperties(this.streamMapper.selectByCondition(inlongStreamPageRequest), InlongStreamBriefInfo::new);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public PageResult<InlongStreamInfo> listAll(InlongStreamPageRequest inlongStreamPageRequest) {
        LOGGER.debug("begin to list full inlong stream page by {}", inlongStreamPageRequest);
        Preconditions.expectNotNull(inlongStreamPageRequest, "request is empty");
        String inlongGroupId = inlongStreamPageRequest.getInlongGroupId();
        Preconditions.expectNotBlank(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotNull(this.groupMapper.selectByGroupId(inlongGroupId), "inlong group not found by groupId=" + inlongGroupId);
        PageHelper.startPage(inlongStreamPageRequest.getPageNum(), inlongStreamPageRequest.getPageSize());
        OrderFieldEnum.checkOrderField(inlongStreamPageRequest);
        OrderTypeEnum.checkOrderType(inlongStreamPageRequest);
        PageResult<InlongStreamInfo> foreach = PageResult.fromPage(this.streamMapper.selectByCondition(inlongStreamPageRequest)).map(inlongStreamEntity -> {
            return (InlongStreamInfo) CommonBeanUtils.copyProperties(inlongStreamEntity, InlongStreamInfo::new);
        }).foreach(inlongStreamInfo -> {
            String inlongStreamId = inlongStreamInfo.getInlongStreamId();
            InlongStreamExtParam.unpackExtParams(inlongStreamInfo);
            inlongStreamInfo.setFieldList(getStreamFields(inlongGroupId, inlongStreamId));
            inlongStreamInfo.setExtList(CommonBeanUtils.copyListProperties(this.streamExtMapper.selectByRelatedId(inlongGroupId, inlongStreamId), InlongStreamExtInfo::new));
            inlongStreamInfo.setSourceList(this.sourceService.listSource(inlongGroupId, inlongStreamId));
            inlongStreamInfo.setSinkList(this.sinkService.listSink(inlongGroupId, inlongStreamId));
        });
        LOGGER.debug("success to list full inlong stream info by {}", inlongStreamPageRequest);
        return foreach;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<InlongStreamBriefInfo> listBriefWithSink(String str) {
        LOGGER.debug("begin to get inlong stream brief list by groupId={}", str);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        List<InlongStreamBriefInfo> copyListProperties = CommonBeanUtils.copyListProperties(this.streamMapper.selectByGroupId(str), InlongStreamBriefInfo::new);
        for (InlongStreamBriefInfo inlongStreamBriefInfo : copyListProperties) {
            inlongStreamBriefInfo.setSinkList(this.sinkService.listBrief(str, inlongStreamBriefInfo.getInlongStreamId()));
        }
        LOGGER.info("success to get inlong stream brief list for groupId={}", str);
        return copyListProperties;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(InlongStreamRequest inlongStreamRequest, String str) {
        LOGGER.debug("begin to update inlong stream info={}", inlongStreamRequest);
        Preconditions.expectNotNull(inlongStreamRequest, "inlong stream request is empty");
        String inlongGroupId = inlongStreamRequest.getInlongGroupId();
        Preconditions.expectNotBlank(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(inlongStreamRequest.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        checkGroupStatusIsTemp(inlongGroupId);
        return updateWithoutCheck(inlongStreamRequest, str);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean update(InlongStreamRequest inlongStreamRequest, UserInfo userInfo) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongStreamRequest.getInlongGroupId());
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.isTempStatus(forCode)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED, String.format("inlong groupId=%s status=%s was not allowed to add/update/delete stream", inlongStreamRequest.getInlongGroupId(), forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId());
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND, String.format("inlong stream not found by groupId=%s, streamId=%s", inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId()));
        }
        if (!Objects.equals(selectByIdentifier.getVersion(), inlongStreamRequest.getVersion())) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("stream has already updated with groupId=%s, streamId=%s, curVersion=%s", selectByIdentifier.getInlongGroupId(), selectByIdentifier.getInlongStreamId(), inlongStreamRequest.getVersion()));
        }
        inlongStreamRequest.setExtParams(InlongStreamExtParam.packExtParams(inlongStreamRequest));
        CommonBeanUtils.copyProperties(inlongStreamRequest, selectByIdentifier, true);
        selectByIdentifier.setModifier(userInfo.getName());
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.streamMapper.updateByIdentifierSelective(selectByIdentifier)) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        updateField(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId(), inlongStreamRequest.getFieldList());
        saveOrUpdateExt(inlongStreamRequest.getInlongGroupId(), inlongStreamRequest.getInlongStreamId(), inlongStreamRequest.getExtList());
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean updateWithoutCheck(InlongStreamRequest inlongStreamRequest, String str) {
        LOGGER.debug("begin to update inlong stream without check, request={}", inlongStreamRequest);
        String inlongGroupId = inlongStreamRequest.getInlongGroupId();
        String inlongStreamId = inlongStreamRequest.getInlongStreamId();
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        if (selectByIdentifier == null) {
            LOGGER.error("inlong stream not found by groupId={}, streamId={}", inlongGroupId, inlongStreamId);
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        String format = String.format("stream has already updated with groupId=%s, streamId=%s, curVersion=%s", selectByIdentifier.getInlongGroupId(), selectByIdentifier.getInlongStreamId(), inlongStreamRequest.getVersion());
        if (!Objects.equals(selectByIdentifier.getVersion(), inlongStreamRequest.getVersion())) {
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        inlongStreamRequest.setExtParams(InlongStreamExtParam.packExtParams(inlongStreamRequest));
        CommonBeanUtils.copyProperties(inlongStreamRequest, selectByIdentifier, true);
        selectByIdentifier.setModifier(str);
        if (this.streamMapper.updateByIdentifierSelective(selectByIdentifier) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        updateField(inlongGroupId, inlongStreamId, inlongStreamRequest.getFieldList());
        saveOrUpdateExt(inlongGroupId, inlongStreamId, inlongStreamRequest.getExtList());
        LOGGER.info("success to update inlong stream without check for groupId={} streamId={}", inlongGroupId, inlongStreamId);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(String str, String str2, String str3) {
        LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", str, str2);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
        checkGroupStatusIsTemp(str);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str, str2);
        if (selectByIdentifier == null) {
            LOGGER.error("inlong stream not found by groupId={}, streamId={}", str, str2);
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        if (this.sourceService.getCount(str, str2).intValue() > 0) {
            LOGGER.error("inlong stream has undeleted sources, delete failed");
            throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
        }
        if (this.sinkService.getCount(str, str2).intValue() > 0) {
            LOGGER.error("inlong stream has undeleted sinks, delete failed");
            throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
        }
        selectByIdentifier.setIsDeleted(selectByIdentifier.getId());
        selectByIdentifier.setModifier(str3);
        if (this.streamMapper.updateByPrimaryKey(selectByIdentifier) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("stream has already updated with group id={}, stream id={}, curVersion={}", new Object[]{selectByIdentifier.getInlongGroupId(), selectByIdentifier.getInlongStreamId(), selectByIdentifier.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.debug("begin to delete inlong stream field, streamId={}", str2);
        this.streamFieldMapper.logicDeleteAllByIdentifier(str, str2);
        this.streamExtMapper.logicDeleteAllByRelatedId(str, str2);
        LOGGER.info("success to delete inlong stream, ext property and fields for groupId={}", str);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean delete(String str, String str2, UserInfo userInfo) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (GroupStatus.isTempStatus(forCode)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED, String.format("inlong groupId=%s status=%s was not allowed to add/update/delete stream", str, forCode));
        }
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str, str2);
        if (selectByIdentifier == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        if (this.sourceService.getCount(str, str2).intValue() > 0) {
            throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
        }
        if (this.sinkService.getCount(str, str2).intValue() > 0) {
            throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
        }
        selectByIdentifier.setIsDeleted(selectByIdentifier.getId());
        selectByIdentifier.setModifier(userInfo.getName());
        if (this.streamMapper.updateByPrimaryKey(selectByIdentifier) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.streamFieldMapper.logicDeleteAllByIdentifier(str, str2);
        this.streamExtMapper.logicDeleteAllByRelatedId(str, str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(rollbackFor = {Throwable.class})
    public Boolean logicDeleteAll(String str, String str2) {
        LOGGER.debug("begin to delete all inlong stream by groupId={}", str);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        checkGroupStatusIsTemp(str);
        List<InlongStreamEntity> selectByGroupId = this.streamMapper.selectByGroupId(str);
        if (CollectionUtils.isEmpty(selectByGroupId)) {
            LOGGER.info("inlong stream not found by groupId={}", str);
            return true;
        }
        for (InlongStreamEntity inlongStreamEntity : selectByGroupId) {
            inlongStreamEntity.setIsDeleted(inlongStreamEntity.getId());
            inlongStreamEntity.setModifier(str2);
            if (this.streamMapper.updateByIdentifierSelective(inlongStreamEntity) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                LOGGER.error("stream has already updated with group id={}, stream id={}, curVersion={}", new Object[]{inlongStreamEntity.getInlongGroupId(), inlongStreamEntity.getInlongStreamId(), inlongStreamEntity.getVersion()});
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
            String inlongStreamId = inlongStreamEntity.getInlongStreamId();
            this.streamFieldMapper.logicDeleteAllByIdentifier(str, inlongStreamId);
            this.streamExtMapper.logicDeleteAllByRelatedId(str, inlongStreamId);
            this.sourceService.logicDeleteAll(str, inlongStreamId, str2);
            this.sinkService.logicDeleteAll(str, inlongStreamId, str2);
        }
        LOGGER.info("success to delete all inlong stream, ext property and fields by groupId={}", str);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public int selectCountByGroupId(String str) {
        LOGGER.debug("begin to get count by groupId={}", str);
        if (StringUtils.isEmpty(str)) {
            return 0;
        }
        int selectCountByGroupId = this.streamMapper.selectCountByGroupId(str);
        LOGGER.info("success to get count");
        return selectCountByGroupId;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<InlongStreamBriefInfo> getTopicList(String str) {
        LOGGER.debug("begin bo get topic list by group id={}", str);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
        List<InlongStreamBriefInfo> selectBriefList = this.streamMapper.selectBriefList(str);
        LOGGER.debug("success to get topic list by groupId={}, result size={}", str, Integer.valueOf(selectBriefList.size()));
        return selectBriefList;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public boolean updateAfterApprove(List<InlongStreamApproveRequest> list, String str) {
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin to update stream after approve={}", list);
        }
        String str2 = null;
        for (InlongStreamApproveRequest inlongStreamApproveRequest : list) {
            str2 = inlongStreamApproveRequest.getInlongGroupId();
            InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(str2, inlongStreamApproveRequest.getInlongStreamId());
            selectByIdentifier.setStatus(StreamStatus.CONFIG_ING.getCode());
            if (this.streamMapper.updateByIdentifierSelective(selectByIdentifier) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                LOGGER.error("stream has already updated with group id={}, stream id={}, curVersion={}", new Object[]{selectByIdentifier.getInlongGroupId(), selectByIdentifier.getInlongStreamId(), selectByIdentifier.getVersion()});
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
            this.sinkService.updateAfterApprove(inlongStreamApproveRequest.getSinkList(), str);
        }
        LOGGER.info("success to update stream after approve for groupId={}", str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public boolean updateStatus(String str, String str2, Integer num, String str3) {
        this.streamMapper.updateStatusByIdentifier(str, str2, num, str3);
        LOGGER.info("success to update stream after approve for groupId=" + str + ", streamId=" + str2);
        return true;
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public void insertDlqOrRlq(String str, String str2, String str3) {
        if (this.streamMapper.selectExistByIdentifier(str, str2).intValue() >= 1) {
            LOGGER.error("DLQ/RLQ topic already exists with name={}", str2);
            throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE, "DLQ/RLQ topic already exists");
        }
        InlongStreamEntity inlongStreamEntity = new InlongStreamEntity();
        inlongStreamEntity.setInlongGroupId(str);
        inlongStreamEntity.setInlongStreamId(str2);
        inlongStreamEntity.setMqResource(str2);
        inlongStreamEntity.setDescription("This is DLQ / RLQ topic created by SYSTEM");
        inlongStreamEntity.setDailyRecords(1000);
        inlongStreamEntity.setDailyStorage(1000);
        inlongStreamEntity.setPeakRecords(1000);
        inlongStreamEntity.setMaxLength(1000);
        inlongStreamEntity.setStatus(StreamStatus.CONFIG_SUCCESSFUL.getCode());
        inlongStreamEntity.setCreator(str3);
        inlongStreamEntity.setModifier(str3);
        this.streamMapper.insert(inlongStreamEntity);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public void logicDeleteDlqOrRlq(String str, String str2, String str3) {
        this.streamMapper.logicDeleteDlqOrRlq(str, str2, str3);
        LOGGER.info("success to logic delete dlq or rlq by groupId={}, topicName={}", str, str2);
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public boolean addFields(AddFieldRequest addFieldRequest) {
        String inlongGroupId = addFieldRequest.getInlongGroupId();
        String inlongStreamId = addFieldRequest.getInlongStreamId();
        try {
            LOGGER.info("begin to add inlong stream fields ={}", addFieldRequest.getSinkFieldList());
            Set set = (Set) this.streamFieldMapper.selectByIdentifier(inlongGroupId, inlongStreamId).stream().map((v0) -> {
                return v0.getFieldName();
            }).collect(Collectors.toSet());
            ArrayList arrayList = new ArrayList();
            for (SinkField sinkField : addFieldRequest.getSinkFieldList()) {
                if (set.contains(sinkField.getSourceFieldName())) {
                    LOGGER.info("current stream field={} is exist for groupId={}, streamId={}", new Object[]{sinkField.getSourceFieldName(), inlongGroupId, inlongStreamId});
                } else {
                    InlongStreamFieldEntity inlongStreamFieldEntity = new InlongStreamFieldEntity();
                    inlongStreamFieldEntity.setFieldName(sinkField.getSourceFieldName());
                    inlongStreamFieldEntity.setFieldType(sinkField.getSourceFieldType());
                    inlongStreamFieldEntity.setFieldComment(sinkField.getFieldComment());
                    inlongStreamFieldEntity.setInlongGroupId(inlongGroupId);
                    inlongStreamFieldEntity.setInlongStreamId(inlongStreamId);
                    inlongStreamFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
                    arrayList.add(inlongStreamFieldEntity);
                }
            }
            if (CollectionUtils.isNotEmpty(arrayList)) {
                this.streamFieldMapper.insertAll(arrayList);
            }
            Iterator it = this.sinkMapper.selectByRelatedId(inlongGroupId, inlongStreamId).iterator();
            while (it.hasNext()) {
                this.sinkService.addFields((StreamSinkEntity) it.next(), addFieldRequest.getSinkFieldList());
            }
            LOGGER.debug("success add inlong stream fields={}", arrayList);
            return true;
        } catch (Exception e) {
            LOGGER.error("add inlong stream fields error for groupId={}, streamId={}", new Object[]{inlongGroupId, inlongStreamId, e});
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("add stream fields error : %s", e.getMessage()));
        }
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
        try {
            String method = parseFieldRequest.getMethod();
            String statement = parseFieldRequest.getStatement();
            boolean z = -1;
            switch (method.hashCode()) {
                case 98822:
                    if (method.equals("csv")) {
                        z = PARSE_FIELD_CSV_MIN_COLUMNS;
                        break;
                    }
                    break;
                case 114126:
                    if (method.equals("sql")) {
                        z = true;
                        break;
                    }
                    break;
                case 3271912:
                    if (method.equals("json")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return parseFieldsByJson(statement);
                case true:
                    return parseFieldsBySql(statement);
                case PARSE_FIELD_CSV_MIN_COLUMNS /* 2 */:
                    return parseFieldsByCsv(statement);
                default:
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("Unsupported parse field mode: %s", method));
            }
        } catch (Exception e) {
            LOGGER.error("parse inlong stream fields error", e);
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, String.format("parse stream fields error : %s", e.getMessage()));
        }
    }

    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<StreamField> parseFields(MultipartFile multipartFile) {
        try {
            try {
                List<StreamField> read = ExcelTool.read(multipartFile.getInputStream(), StreamField.class);
                if (CollectionUtils.isEmpty(read)) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The content of uploaded Excel file is empty, please check!");
                }
                return read;
            } catch (IOException | IllegalAccessException | InstantiationException | NoSuchMethodException e) {
                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Can not properly parse excel, message: " + e.getClass().getName() + ":" + e.getMessage());
            }
        } catch (IOException e2) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Can not properly read update file");
        }
    }

    private List<StreamField> parseFieldsByCsv(String str) {
        String[] split = str.split("\n");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < split.length; i++) {
            String str2 = split[i];
            if (!StringUtils.isBlank(str2)) {
                String[] split2 = str2.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
                if (split2.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "At least two fields are required, line number is " + (i + 1));
                }
                String str3 = split2[0];
                if (!InlongConstants.PATTERN_NORMAL_CHARACTERS.matcher(str3).matches()) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) + " can only contain letters, underscores or numbers");
                }
                String str4 = split2[1];
                if (!InlongConstants.STREAM_FIELD_TYPES.contains(str4)) {
                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The field type in line" + (i + 1) + " must be one of " + InlongConstants.STREAM_FIELD_TYPES);
                }
                String str5 = split2.length == PARSE_FIELD_CSV_MAX_COLUMNS ? split2[PARSE_FIELD_CSV_MIN_COLUMNS] : null;
                StreamField streamField = new StreamField();
                streamField.setFieldName(str3);
                streamField.setFieldType(str4);
                streamField.setFieldComment(str5);
                arrayList.add(streamField);
            }
        }
        return arrayList;
    }

    private List<StreamField> parseFieldsBySql(String str) throws JSQLParserException {
        CreateTable parse = new CCJSqlParserManager().parse(new StringReader(str));
        ArrayList arrayList = new ArrayList();
        if (!(parse instanceof CreateTable)) {
            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The SQL statement must be a table creation statement");
        }
        List columnDefinitions = parse.getColumnDefinitions();
        for (int i = 0; i < columnDefinitions.size(); i++) {
            ColumnDefinition columnDefinition = (ColumnDefinition) columnDefinitions.get(i);
            StreamField streamField = new StreamField();
            streamField.setFieldName(columnDefinition.getColumnName());
            String dataType = columnDefinition.getColDataType().getDataType();
            Class sqlTypeToJavaType = FieldInfoUtils.sqlTypeToJavaType(dataType);
            if (sqlTypeToJavaType == Object.class) {
                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + dataType);
            }
            streamField.setFieldType(sqlTypeToJavaType.getSimpleName().toLowerCase());
            List columnSpecs = columnDefinition.getColumnSpecs();
            if (CollectionUtils.isNotEmpty(columnSpecs)) {
                int i2 = -1;
                int i3 = 0;
                while (true) {
                    if (i3 >= columnSpecs.size()) {
                        break;
                    }
                    if (((String) columnSpecs.get(i3)).toUpperCase().startsWith("COMMENT")) {
                        i2 = i3;
                        break;
                    }
                    i3++;
                }
                String str2 = null;
                if (-1 != i2 && columnSpecs.size() > i2 + 1) {
                    str2 = ((String) columnSpecs.get(i2 + 1)).replaceAll("['\"]", "");
                }
                streamField.setFieldComment(str2);
            }
            arrayList.add(streamField);
        }
        return arrayList;
    }

    private List<StreamField> parseFieldsByJson(String str) throws JsonProcessingException {
        return (List) ((List) this.objectMapper.readValue(str, new TypeReference<List<Map<String, String>>>() { // from class: org.apache.inlong.manager.service.stream.InlongStreamServiceImpl.1
        })).stream().map(map -> {
            String str2 = (String) map.get("name");
            String str3 = (String) map.get("type");
            String str4 = (String) map.get("desc");
            StreamField streamField = new StreamField();
            streamField.setFieldName(str2);
            streamField.setFieldType(str3);
            streamField.setFieldComment(str4);
            return streamField;
        }).collect(Collectors.toList());
    }

    @Transactional(rollbackFor = {Throwable.class})
    public void updateField(String str, String str2, List<StreamField> list) {
        LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", new Object[]{str, str2, list});
        try {
            this.streamFieldMapper.deleteAllByIdentifier(str, str2);
            saveField(str, str2, list);
            LOGGER.info("success to update inlong stream field for groupId={}", str);
        } catch (Exception e) {
            LOGGER.error("failed to update inlong stream field: ", e);
            throw new BusinessException(ErrorCodeEnum.STREAM_FIELD_SAVE_FAILED);
        }
    }

    @Transactional(rollbackFor = {Throwable.class})
    public void saveField(String str, String str2, List<StreamField> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(streamField -> {
            streamField.setId((Integer) null);
        });
        List<InlongStreamFieldEntity> copyListProperties = CommonBeanUtils.copyListProperties(list, InlongStreamFieldEntity::new);
        for (InlongStreamFieldEntity inlongStreamFieldEntity : copyListProperties) {
            inlongStreamFieldEntity.setInlongGroupId(str);
            inlongStreamFieldEntity.setInlongStreamId(str2);
            inlongStreamFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
        }
        this.streamFieldMapper.insertAll(copyListProperties);
    }

    @Transactional(rollbackFor = {Throwable.class})
    public void saveOrUpdateExt(String str, String str2, List<InlongStreamExtInfo> list) {
        LOGGER.info("begin to save or update inlong stream ext info, groupId={}, streamId={}, ext={}", new Object[]{str, str2, list});
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        List copyListProperties = CommonBeanUtils.copyListProperties(list, InlongStreamExtEntity::new);
        copyListProperties.forEach(inlongStreamExtEntity -> {
            inlongStreamExtEntity.setInlongGroupId(str);
            inlongStreamExtEntity.setInlongStreamId(str2);
        });
        this.streamExtMapper.insertOnDuplicateKeyUpdate(copyListProperties);
        LOGGER.info("success to save or update inlong stream ext for groupId={}", str);
    }

    private InlongGroupEntity checkGroupStatusIsTemp(String str) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        Preconditions.expectNotNull(selectByGroupId, "groupId is invalid");
        GroupStatus forCode = GroupStatus.forCode(selectByGroupId.getStatus().intValue());
        if (!GroupStatus.isTempStatus(forCode)) {
            return selectByGroupId;
        }
        LOGGER.error("inlong groupId={} status={} was not allowed to add/update/delete stream", str, forCode);
        throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.inlong.manager.service.stream.InlongStreamService
    public List<BriefMQMessage> listMessages(String str, String str2, Integer num, String str3) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        InlongGroupInfo mo32getFromEntity = this.groupOperatorFactory.getInstance(selectByGroupId.getMqType()).mo32getFromEntity(selectByGroupId);
        InlongStreamInfo inlongStreamInfo = get(str, str2);
        List arrayList = new ArrayList();
        try {
            arrayList = this.queueOperatorFactory.getInstance(selectByGroupId.getMqType()).queryLatestMessages(mo32getFromEntity, inlongStreamInfo, num);
        } catch (Exception e) {
            LOGGER.error("query message error ", e);
        }
        return arrayList;
    }
}
