/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.sink.SinkOperationFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperation;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StreamSinkServiceImpl
implements StreamSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
    public final ExecutorService executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryBuilder().setNameFormat("stream-workflow-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    @Autowired
    private SinkOperationFactory operationFactory;
    @Autowired
    private CommonOperateService commonOperateService;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;
    @Autowired
    private WorkflowService workflowService;

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Integer save(SinkRequest request, String operator) {
        LOGGER.info("begin to save sink info: {}", (Object)request);
        this.checkParams(request);
        String groupId = request.getInlongGroupId();
        InlongGroupEntity groupEntity = this.commonOperateService.checkGroupStatus(groupId, operator);
        String streamId = request.getInlongStreamId();
        String sinkType = request.getSinkType();
        List sinkExist = this.sinkMapper.selectByIdAndType(groupId, streamId, sinkType);
        Preconditions.checkEmpty((Collection)sinkExist, (String)ErrorCodeEnum.SINK_ALREADY_EXISTS.getMessage());
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        int id = operation.saveOpt(request, operator);
        if (GroupState.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
            this.executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
        }
        LOGGER.info("success to save sink info: {}", (Object)request);
        return id;
    }

    @Override
    public SinkResponse get(Integer id, String sinkType) {
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        SinkResponse sinkResponse = operation.getById(sinkType, id);
        LOGGER.debug("success to get sink by id={}, sinkType={}", (Object)id, (Object)sinkType);
        return sinkResponse;
    }

    @Override
    public Integer getCount(String groupId, String streamId) {
        Integer count = this.sinkMapper.selectCount(groupId, streamId);
        LOGGER.debug("sink count={} with groupId={}, streamId={}", new Object[]{count, groupId, streamId});
        return count;
    }

    @Override
    public List<SinkResponse> listSink(String groupId, String streamId) {
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isEmpty((Collection)entityList)) {
            return Collections.emptyList();
        }
        ArrayList<SinkResponse> responseList = new ArrayList<SinkResponse>();
        entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSinkType())));
        LOGGER.debug("success to list sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return responseList;
    }

    @Override
    public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        Preconditions.checkNotNull((Object)streamId, (String)"inlong stream id is empty");
        List summaryList = this.sinkMapper.selectSummary(groupId, streamId);
        LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
        return summaryList;
    }

    @Override
    public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
        Preconditions.checkNotNull((Object)request.getInlongGroupId(), (String)"inlong group id is empty");
        PageHelper.startPage((int)request.getPageNum(), (int)request.getPageSize());
        List entityPage = this.sinkMapper.selectByCondition(request);
        HashMap sinkMap = Maps.newHashMap();
        for (Object streamSink : entityPage) {
            SinkType sinkType = SinkType.forType((String)streamSink.getSinkType());
            sinkMap.computeIfAbsent(sinkType, k -> new Page()).add(streamSink);
        }
        ArrayList sinkListResponses = Lists.newArrayList();
        for (Map.Entry entry : sinkMap.entrySet()) {
            SinkType sinkType = (SinkType)entry.getKey();
            StreamSinkOperation operation = this.operationFactory.getInstance(sinkType);
            PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo((Page<StreamSinkEntity>)((Page)entry.getValue()));
            sinkListResponses.addAll(pageInfo.getList());
        }
        PageInfo pageInfo = PageInfo.of((List)sinkListResponses);
        LOGGER.debug("success to list sink page, result size {}", (Object)pageInfo.getSize());
        return pageInfo;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean update(SinkRequest request, String operator) {
        LOGGER.info("begin to update sink info: {}", (Object)request);
        this.checkParams(request);
        Preconditions.checkNotNull((Object)request.getId(), (String)"primary key is empty");
        String groupId = request.getInlongGroupId();
        InlongGroupEntity groupEntity = this.commonOperateService.checkGroupStatus(groupId, operator);
        String streamId = request.getInlongStreamId();
        String sinkType = request.getSinkType();
        StreamSinkOperation operation = this.operationFactory.getInstance(SinkType.forType((String)sinkType));
        operation.updateOpt(request, operator);
        if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
            this.executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
        }
        LOGGER.info("success to update sink info: {}", (Object)request);
        return true;
    }

    @Override
    public void updateStatus(int id, int status, String log) {
        StreamSinkEntity entity = new StreamSinkEntity();
        entity.setId(Integer.valueOf(id));
        entity.setStatus(Integer.valueOf(status));
        entity.setOperateLog(log);
        this.sinkMapper.updateStatus(entity);
        LOGGER.info("success to update sink status={} for id={} with log: {}", new Object[]{status, id, log});
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean delete(Integer id, String sinkType, String operator) {
        LOGGER.info("begin to delete sink by id={}, sinkType={}", (Object)id, (Object)sinkType);
        Preconditions.checkNotNull((Object)id, (String)"primary key is empty");
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        this.commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
        entity.setPreviousStatus(entity.getStatus());
        entity.setStatus(EntityStatus.DELETED.getCode());
        entity.setIsDeleted(id);
        entity.setModifier(operator);
        entity.setModifyTime(new Date());
        this.sinkMapper.updateByPrimaryKeySelective(entity);
        this.sinkFieldMapper.logicDeleteAll(id);
        LOGGER.info("success to delete sink info: {}", (Object)entity);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        Preconditions.checkNotNull((Object)streamId, (String)"inlong stream id is empty");
        this.commonOperateService.checkGroupStatus(groupId, operator);
        Date now = new Date();
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                Integer id = entity.getId();
                entity.setPreviousStatus(entity.getStatus());
                entity.setStatus(EntityStatus.DELETED.getCode());
                entity.setIsDeleted(id);
                entity.setModifier(operator);
                entity.setModifyTime(now);
                this.sinkMapper.deleteByPrimaryKey(id);
                this.sinkFieldMapper.logicDeleteAll(id);
            });
        }
        LOGGER.info("success to logic delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    @Transactional(rollbackFor={Throwable.class})
    public Boolean deleteAll(String groupId, String streamId, String operator) {
        LOGGER.info("begin to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        Preconditions.checkNotNull((Object)streamId, (String)"inlong stream id is empty");
        this.commonOperateService.checkGroupStatus(groupId, operator);
        List entityList = this.sinkMapper.selectByRelatedId(groupId, streamId);
        if (CollectionUtils.isNotEmpty((Collection)entityList)) {
            entityList.forEach(entity -> {
                this.sinkMapper.deleteByPrimaryKey(entity.getId());
                this.sinkFieldMapper.deleteAll(entity.getId());
            });
        }
        LOGGER.info("success to delete all sink by groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        return true;
    }

    @Override
    public List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList) {
        LOGGER.debug("begin to filter stream by groupId={}, type={}, streamId={}", new Object[]{groupId, sinkType, streamIdList});
        if (StringUtils.isEmpty((CharSequence)sinkType) || CollectionUtils.isEmpty(streamIdList)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectExistsStreamId(groupId, sinkType, streamIdList);
        LOGGER.debug("success to filter stream id list, result streamId={}", (Object)resultList);
        return resultList;
    }

    @Override
    public List<String> getSinkTypeList(String groupId, String streamId) {
        if (StringUtils.isEmpty((CharSequence)streamId)) {
            return Collections.emptyList();
        }
        List resultList = this.sinkMapper.selectSinkType(groupId, streamId);
        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", new Object[]{groupId, streamId, resultList});
        return resultList;
    }

    @Override
    public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
        LOGGER.info("begin to update sink after approve: {}", approveList);
        if (CollectionUtils.isEmpty(approveList)) {
            return true;
        }
        Date now = new Date();
        for (SinkApproveDTO dto : approveList) {
            String sinkType = dto.getSinkType();
            Preconditions.checkNotNull((Object)sinkType, (String)"Sink type is empty");
            StreamSinkEntity entity = new StreamSinkEntity();
            entity.setId(dto.getId());
            int status = dto.getStatus() == null ? EntityStatus.SINK_CONFIG_ING.getCode() : dto.getStatus();
            entity.setPreviousStatus(entity.getStatus());
            entity.setStatus(Integer.valueOf(status));
            entity.setModifier(operator);
            entity.setModifyTime(now);
            this.sinkMapper.updateByPrimaryKeySelective(entity);
        }
        LOGGER.info("success to update sink after approve: {}", approveList);
        return true;
    }

    private void checkParams(SinkRequest request) {
        Preconditions.checkNotNull((Object)request, (String)"request is empty");
        String groupId = request.getInlongGroupId();
        Preconditions.checkNotNull((Object)groupId, (String)"inlong group id is empty");
        String streamId = request.getInlongStreamId();
        Preconditions.checkNotNull((Object)streamId, (String)"inlong stream id is empty");
        String sinkType = request.getSinkType();
        Preconditions.checkNotNull((Object)sinkType, (String)"Sink type is empty");
    }

    class WorkflowStartRunnable
    implements Runnable {
        private final String operator;
        private final InlongGroupEntity inlongGroupEntity;
        private final String streamId;

        public WorkflowStartRunnable(String operator, InlongGroupEntity inlongGroupEntity, String streamId) {
            this.operator = operator;
            this.inlongGroupEntity = inlongGroupEntity;
            this.streamId = streamId;
        }

        @Override
        public void run() {
            String groupId = this.inlongGroupEntity.getInlongGroupId();
            LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", (Object)groupId, (Object)this.streamId);
            InlongGroupInfo groupInfo = (InlongGroupInfo)CommonBeanUtils.copyProperties((Object)this.inlongGroupEntity, InlongGroupInfo::new);
            GroupResourceProcessForm form = this.genGroupResourceProcessForm(groupInfo, this.streamId);
            StreamSinkServiceImpl.this.workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, this.operator, (ProcessForm)form);
            LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", (Object)groupId, (Object)this.streamId);
        }

        private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo, String streamId) {
            GroupResourceProcessForm form = new GroupResourceProcessForm();
            form.setGroupInfo(groupInfo);
            form.setInlongStreamId(streamId);
            return form;
        }
    }
}

