/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
import javax.validation.constraints.NotNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.sink.StreamSinkOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class KafkaSinkOperation
implements StreamSinkOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperation.class);
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;

    @Override
    public Boolean accept(SinkType sinkType) {
        return SinkType.KAFKA.equals((Object)sinkType);
    }

    @Override
    public Integer saveOpt(SinkRequest request, String operator) {
        String sinkType = request.getSinkType();
        Preconditions.checkTrue((boolean)"KAFKA".equals(sinkType), (String)(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType));
        KafkaSinkRequest kafkaSinkRequest = (KafkaSinkRequest)request;
        StreamSinkEntity entity = (StreamSinkEntity)CommonBeanUtils.copyProperties((Object)kafkaSinkRequest, StreamSinkEntity::new);
        entity.setStatus(EntityStatus.SINK_NEW.getCode());
        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
        entity.setCreator(operator);
        entity.setModifier(operator);
        Date now = new Date();
        entity.setCreateTime(now);
        entity.setModifyTime(now);
        KafkaSinkDTO dto = KafkaSinkDTO.getFromRequest((KafkaSinkRequest)kafkaSinkRequest);
        try {
            entity.setExtParams(this.objectMapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
        }
        this.sinkMapper.insert(entity);
        Integer sinkId = entity.getId();
        request.setId(sinkId);
        this.saveFieldOpt(request);
        return sinkId;
    }

    @Override
    public void saveFieldOpt(SinkRequest request) {
        List fieldList = request.getFieldList();
        LOGGER.info("begin to save field={}", (Object)fieldList);
        if (CollectionUtils.isEmpty((Collection)fieldList)) {
            return;
        }
        int size = fieldList.size();
        ArrayList<StreamSinkFieldEntity> entityList = new ArrayList<StreamSinkFieldEntity>(size);
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkType = request.getSinkType();
        Integer sinkId = request.getId();
        for (SinkFieldRequest fieldInfo : fieldList) {
            StreamSinkFieldEntity fieldEntity = (StreamSinkFieldEntity)CommonBeanUtils.copyProperties((Object)fieldInfo, StreamSinkFieldEntity::new);
            if (StringUtils.isEmpty((CharSequence)fieldEntity.getFieldComment())) {
                fieldEntity.setFieldComment(fieldEntity.getFieldName());
            }
            fieldEntity.setInlongGroupId(groupId);
            fieldEntity.setInlongStreamId(streamId);
            fieldEntity.setSinkType(sinkType);
            fieldEntity.setSinkId(sinkId);
            fieldEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
            entityList.add(fieldEntity);
        }
        this.sinkFieldMapper.insertAll(entityList);
        LOGGER.info("success to save field");
    }

    @Override
    public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id) {
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(id);
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        String existType = entity.getSinkType();
        Preconditions.checkTrue((boolean)"KAFKA".equals(existType), (String)String.format("Expected sink type is %s, but found %s", "KAFKA", existType));
        SinkResponse response = (SinkResponse)this.getFromEntity(entity, KafkaSinkResponse::new);
        List entities = this.sinkFieldMapper.selectBySinkId(id);
        List infos = CommonBeanUtils.copyListProperties((List)entities, SinkFieldResponse::new);
        response.setFieldList(infos);
        return response;
    }

    @Override
    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
        T result = target.get();
        if (entity == null) {
            return result;
        }
        String existType = entity.getSinkType();
        Preconditions.checkTrue((boolean)"KAFKA".equals(existType), (String)String.format("Expected sink type is %s, but found %s", "KAFKA", existType));
        KafkaSinkDTO dto = KafkaSinkDTO.getFromJson((String)entity.getExtParams());
        CommonBeanUtils.copyProperties((Object)entity, result, (boolean)true);
        CommonBeanUtils.copyProperties((Object)dto, result, (boolean)true);
        return result;
    }

    @Override
    public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
        if (CollectionUtils.isEmpty(entityPage)) {
            return new PageInfo();
        }
        return entityPage.toPageInfo(entity -> this.getFromEntity((StreamSinkEntity)entity, KafkaSinkListResponse::new));
    }

    @Override
    public void updateOpt(SinkRequest request, String operator) {
        String sinkType = request.getSinkType();
        Preconditions.checkTrue((boolean)"KAFKA".equals(sinkType), (String)String.format("Expected sink type is %s, but found %s", "KAFKA", sinkType));
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(request.getId());
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        KafkaSinkRequest kafkaSinkRequest = (KafkaSinkRequest)request;
        CommonBeanUtils.copyProperties((Object)kafkaSinkRequest, (Object)entity, (boolean)true);
        try {
            KafkaSinkDTO dto = KafkaSinkDTO.getFromRequest((KafkaSinkRequest)kafkaSinkRequest);
            entity.setExtParams(this.objectMapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
        }
        entity.setPreviousStatus(entity.getStatus());
        entity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
        entity.setModifier(operator);
        entity.setModifyTime(new Date());
        this.sinkMapper.updateByPrimaryKeySelective(entity);
        boolean onlyAdd = EntityStatus.SINK_CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
        this.updateFieldOpt(onlyAdd, (SinkRequest)kafkaSinkRequest);
        LOGGER.info("success to update sink of type={}", (Object)sinkType);
    }

    @Override
    public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
        Integer sinkId = request.getId();
        List fieldRequestList = request.getFieldList();
        if (CollectionUtils.isEmpty((Collection)fieldRequestList)) {
            return;
        }
        if (onlyAdd.booleanValue()) {
            List existsFieldList = this.sinkFieldMapper.selectBySinkId(sinkId);
            if (existsFieldList.size() > fieldRequestList.size()) {
                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
            }
            for (int i = 0; i < existsFieldList.size(); ++i) {
                if (((StreamSinkFieldEntity)existsFieldList.get(i)).getFieldName().equals(((SinkFieldRequest)fieldRequestList.get(i)).getFieldName())) continue;
                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
            }
        }
        this.sinkFieldMapper.deleteAll(sinkId);
        this.saveFieldOpt(request);
        LOGGER.info("success to update field");
    }
}

