/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.core.operation.InlongStreamProcessOperation;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.sink.SinkOperationFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperation;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StreamSinkServiceImpl
implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
    @Autowired
    private SinkOperationFactory operationFactory;
    @Autowired
    private GroupCheckService groupCheckService;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;
    private InlongStreamProcessOperation streamProcessOperation;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(SinkRequest request, String operator) {
        LOGGER.info("begin to save sink info: {}", (Object)request);
        this.checkParams(request);
        String groupId = request.getInlongGroupId();
        this.groupCheckService.checkGroupStatus(groupId, operator);
        String streamId = request.getInlongStreamId();
        String sinkType = request.getSinkType();
        String sinkName = request.getSinkName();
        List sinkList = this.sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
        for (StreamSinkEntity sinkEntity : sinkList) {
            if (sinkEntity == null || !Objects.equals(sinkEntity.getSinkName(), sinkName)) continue;
            String err = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sinkName, groupId, streamId));
        }
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        List fields = request.getSinkFieldList();
        if (CollectionUtils.isNotEmpty((Collection)fields)) {
            fields.stream().forEach(sinkField -> sinkField.setId(null));
        }
        int id = operation.saveOpt(request, operator);
        LOGGER.info("success to save sink info: {}", (Object)request);
        return id;
    }

    @Override
    public StreamSink get(Integer id) {
        Preconditions.checkNotNull((Object)id, (String)"sink id is empty");
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        if (entity == null) {
            LOGGER.error("sink not found by id={}", (Object)id);
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
        }
        String sinkType = entity.getSinkType();
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        StreamSink streamSink = operation.getByEntity(entity);
        LOGGER.debug("success to get sink info by id={}", (Object)id);
        return streamSink;
    }

    @Override
    public Integer getCount(String groupId, String streamId) {
        Integer count = this.sinkMapper.selectCount(groupId, streamId);
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{count, groupId, streamId});
        return count;
    }

    @Override
    public List<StreamSink> listSink(String groupId, String streamId) {
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            return Collections.emptyList();
        }
        ArrayList<StreamSink> responseList = new ArrayList<StreamSink>();
        entityList.forEach(entity -> responseList.add(this.get(entity.getId())));
        LOGGER.debug("success to list sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return responseList;
    }

    @Override
    public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull((Object)streamId, (String)ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        List summaryList = this.sinkMapper.selectSummary(groupId, streamId);
        LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
        return summaryList;
    }

    @Override
    public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
        Preconditions.checkNotNull((Object)request.getInlongGroupId(), (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        List entityPage = this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (Object streamSink : entityPage) {
            SinkType sinkType = SinkType.forType((String)streamSink.getSinkType());
            sinkMap.computeIfAbsent(sinkType, k -> new Page()).add(streamSink);
        }
        ArrayList sinkListResponses = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            SinkType sinkType = (SinkType)entry.getKey();
            StreamSinkOperation operation = this.operationFactory.getInstance(sinkType);
            PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            sinkListResponses.addAll(pageInfo.getList());
        }
        PageInfo pageInfo = PageInfo.of((List)sinkListResponses);
        LOGGER.debug("success to list sink page, result size {}", (Object)pageInfo.getSize());
        return pageInfo;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(SinkRequest request, String operator) {
        LOGGER.info("begin to update sink info: {}", (Object)request);
        this.checkParams(request);
        Preconditions.checkNotNull((Object)request.getId(), (String)ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkName = request.getSinkName();
        String sinkType = request.getSinkType();
        InlongGroupEntity groupEntity = this.groupCheckService.checkGroupStatus(groupId, operator);
        List sinkList = this.sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
        for (StreamSinkEntity entity : sinkList) {
            Integer sinkId = entity.getId();
            if (Objects.equals(request.getId(), sinkId) || !Objects.equals(entity.getSinkName(), sinkName)) continue;
            String err = "sink name=%s already exists with the groupId=%s streamId=%s";
            throw new BusinessException(String.format(err, sinkName, groupId, streamId));
        }
        List fields = request.getSinkFieldList();
        if (CollectionUtils.isNotEmpty((Collection)fields)) {
            fields.stream().forEach(sinkField -> sinkField.setId(null));
        }
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        operation.updateOpt(request, operator);
        if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
            if (this.streamProcessOperation == null) {
                this.streamProcessOperation = new InlongStreamProcessOperation();
                this.autowireCapableBeanFactory.autowireBean((Object)this.streamProcessOperation);
            }
            this.streamProcessOperation.startProcess(groupId, streamId, operator, true);
        }
        LOGGER.info("success to update sink info: {}", (Object)request);
        return true;
    }

    @Override
    public void updateStatus(int id, int status, String log) {
        StreamSinkEntity entity = new StreamSinkEntity();
        entity.setId(Integer.valueOf(id));
        entity.setStatus(Integer.valueOf(status));
        entity.setOperateLog(log);
        this.sinkMapper.updateStatus(entity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{status, id, log});
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, String operator) {
        LOGGER.info("begin to delete sink by id={}", (Object)id);
        Preconditions.checkNotNull((Object)id, (String)ErrorCodeEnum.ID_IS_EMPTY.getMessage());
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        this.groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
        entity.setPreviousStatus(entity.getStatus());
        entity.setStatus(GlobalConstants.DELETED_STATUS);
        entity.setIsDeleted(id);
        entity.setModifier(operator);
        entity.setModifyTime(new Date());
        this.sinkMapper.updateByPrimaryKeySelective(entity);
        this.sinkFieldMapper.logicDeleteAll(id);
        LOGGER.info("success to delete sink info: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull((Object)streamId, (String)ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        this.groupCheckService.checkGroupStatus(groupId, operator);
        Date now = new Date();
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                Integer id = entity.getId();
                entity.setPreviousStatus(entity.getStatus());
                entity.setStatus(GlobalConstants.DELETED_STATUS);
                entity.setIsDeleted(id);
                entity.setModifier(operator);
                entity.setModifyTime(now);
                this.sinkMapper.updateByPrimaryKeySelective(entity);
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        Preconditions.checkNotNull((Object)streamId, (String)ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        this.groupCheckService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId, null);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                this.sinkMapper.deleteByPrimaryKey(entity.getId());
                this.sinkFieldMapper.deleteAll(entity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    public List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{groupId, sinkType, streamIdList});
        if (StringUtils.isEmpty((CharSequence)sinkType) || CollectionUtils.isEmpty(streamIdList)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectExistsStreamId(groupId, sinkType, streamIdList);
        LOGGER.debug("success to filter stream id list, result streamId={}", (Object)resultList);
        return resultList;
    }

    @Override
    public List<String> getSinkTypeList(String groupId, String streamId) {
        if (StringUtils.isEmpty((CharSequence)streamId)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectSinkType(groupId, streamId);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{groupId, streamId, resultList});
        return resultList;
    }

    @Override
    public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
        LOGGER.info("begin to update sink after approve: {}", approveList);
        if (CollectionUtils.isEmpty(approveList)) {
            return true;
        }
        Date now = new Date();
        for (SinkApproveDTO dto : approveList) {
            String sinkType = dto.getSinkType();
            Preconditions.checkNotNull((Object)sinkType, (String)ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
            StreamSinkEntity entity = new StreamSinkEntity();
            entity.setId(dto.getId());
            int status = dto.getStatus() == null ? SinkStatus.CONFIG_ING.getCode() : dto.getStatus();
            entity.setPreviousStatus(entity.getStatus());
            entity.setStatus(Integer.valueOf(status));
            entity.setModifier(operator);
            entity.setModifyTime(now);
            this.sinkMapper.updateByPrimaryKeySelective(entity);
        }
        LOGGER.info("success to update sink after approve: {}", approveList);
        return true;
    }

    private void checkParams(SinkRequest request) {
        Preconditions.checkNotNull((Object)request, (String)ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
        String groupId = request.getInlongGroupId();
        Preconditions.checkNotNull((Object)groupId, (String)ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
        String streamId = request.getInlongStreamId();
        Preconditions.checkNotNull((Object)streamId, (String)ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
        String sinkType = request.getSinkType();
        Preconditions.checkNotNull((Object)sinkType, (String)ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
        String sinkName = request.getSinkName();
        Preconditions.checkNotNull((Object)sinkName, (String)ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage());
    }
}

