/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sink.iceberg;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergColumnInfo;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkDTO;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.sink.StreamSinkOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class IcebergSinkOperation
implements StreamSinkOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSinkOperation.class);
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private StreamSinkEntityMapper sinkMapper;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;

    @Override
    public Boolean accept(SinkType sinkType) {
        return SinkType.ICEBERG.equals((Object)sinkType);
    }

    @Override
    public Integer saveOpt(SinkRequest request, String operator) {
        String sinkType = request.getSinkType();
        Preconditions.checkTrue((boolean)"ICEBERG".equals(sinkType), (String)(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType));
        IcebergSinkRequest icebergSinkRequest = (IcebergSinkRequest)request;
        StreamSinkEntity entity = (StreamSinkEntity)CommonBeanUtils.copyProperties((Object)icebergSinkRequest, StreamSinkEntity::new);
        entity.setStatus(SinkStatus.NEW.getCode());
        entity.setIsDeleted(GlobalConstants.UN_DELETED);
        entity.setCreator(operator);
        entity.setModifier(operator);
        Date now = new Date();
        entity.setCreateTime(now);
        entity.setModifyTime(now);
        IcebergSinkDTO dto = IcebergSinkDTO.getFromRequest((IcebergSinkRequest)icebergSinkRequest);
        try {
            entity.setExtParams(this.objectMapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
        }
        this.sinkMapper.insert(entity);
        Integer sinkId = entity.getId();
        request.setId(sinkId);
        this.saveFieldOpt(request);
        return sinkId;
    }

    @Override
    public void saveFieldOpt(SinkRequest request) {
        List fieldList = request.getSinkFieldList();
        LOGGER.info("begin to save iceberg field={}", (Object)fieldList);
        if (CollectionUtils.isEmpty((Collection)fieldList)) {
            return;
        }
        int size = fieldList.size();
        ArrayList<StreamSinkFieldEntity> entityList = new ArrayList<StreamSinkFieldEntity>(size);
        String groupId = request.getInlongGroupId();
        String streamId = request.getInlongStreamId();
        String sinkType = request.getSinkType();
        Integer sinkId = request.getId();
        for (SinkField fieldInfo : fieldList) {
            this.checkFieldInfo(fieldInfo);
            StreamSinkFieldEntity fieldEntity = (StreamSinkFieldEntity)CommonBeanUtils.copyProperties((Object)fieldInfo, StreamSinkFieldEntity::new);
            if (StringUtils.isEmpty((CharSequence)fieldEntity.getFieldComment())) {
                fieldEntity.setFieldComment(fieldEntity.getFieldName());
            }
            fieldEntity.setInlongGroupId(groupId);
            fieldEntity.setInlongStreamId(streamId);
            fieldEntity.setSinkType(sinkType);
            fieldEntity.setSinkId(sinkId);
            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
            entityList.add(fieldEntity);
        }
        this.sinkFieldMapper.insertAll(entityList);
        LOGGER.info("success to save iceberg field");
    }

    private void checkFieldInfo(SinkField field) {
        if (FieldType.forName((String)field.getFieldType()) == FieldType.DECIMAL) {
            IcebergColumnInfo info = IcebergColumnInfo.getFromJson((String)field.getExtParams());
            if (info.getPrecision() == null || info.getScale() == null) {
                String errorMsg = String.format("precision or scale not specified for decimal field (%s)", field.getFieldName());
                LOGGER.error("field info check error: {}", (Object)errorMsg);
                throw new BusinessException(errorMsg);
            }
            if (info.getPrecision() < info.getScale()) {
                String errorMsg = String.format("precision (%d) must be greater or equal than scale (%d) for decimal field (%s)", info.getPrecision(), info.getScale(), field.getFieldName());
                LOGGER.error("field info check error: {}", (Object)errorMsg);
                throw new BusinessException(errorMsg);
            }
        }
    }

    @Override
    public StreamSink getByEntity(StreamSinkEntity entity) {
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        String existType = entity.getSinkType();
        Preconditions.checkTrue((boolean)"ICEBERG".equals(existType), (String)String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), "ICEBERG", existType));
        StreamSink response = (StreamSink)this.getFromEntity(entity, IcebergSink::new);
        List entities = this.sinkFieldMapper.selectBySinkId(entity.getId());
        List infos = CommonBeanUtils.copyListProperties((List)entities, SinkField::new);
        response.setSinkFieldList(infos);
        return response;
    }

    @Override
    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
        T result = target.get();
        if (entity == null) {
            return result;
        }
        String existType = entity.getSinkType();
        Preconditions.checkTrue((boolean)"ICEBERG".equals(existType), (String)String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), "ICEBERG", existType));
        IcebergSinkDTO dto = IcebergSinkDTO.getFromJson((String)entity.getExtParams());
        CommonBeanUtils.copyProperties((Object)entity, result, (boolean)true);
        CommonBeanUtils.copyProperties((Object)dto, result, (boolean)true);
        return result;
    }

    @Override
    public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
        if (CollectionUtils.isEmpty(entityPage)) {
            return new PageInfo();
        }
        return entityPage.toPageInfo(entity -> this.getFromEntity((StreamSinkEntity)entity, IcebergSinkListResponse::new));
    }

    @Override
    public void updateOpt(SinkRequest request, String operator) {
        String sinkType = request.getSinkType();
        Preconditions.checkTrue((boolean)"ICEBERG".equals(sinkType), (String)String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), "ICEBERG", sinkType));
        StreamSinkEntity entity = this.sinkMapper.selectByPrimaryKey(request.getId());
        Preconditions.checkNotNull((Object)entity, (String)ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
        IcebergSinkRequest icebergSinkRequest = (IcebergSinkRequest)request;
        CommonBeanUtils.copyProperties((Object)icebergSinkRequest, (Object)entity, (boolean)true);
        try {
            IcebergSinkDTO dto = IcebergSinkDTO.getFromRequest((IcebergSinkRequest)icebergSinkRequest);
            entity.setExtParams(this.objectMapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
        }
        entity.setPreviousStatus(entity.getStatus());
        entity.setStatus(SinkStatus.CONFIG_ING.getCode());
        entity.setModifier(operator);
        entity.setModifyTime(new Date());
        this.sinkMapper.updateByPrimaryKeySelective(entity);
        boolean onlyAdd = SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
        this.updateFieldOpt(onlyAdd, (SinkRequest)icebergSinkRequest);
        LOGGER.info("success to update sink of type={}", (Object)sinkType);
    }

    @Override
    public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
        Integer sinkId = request.getId();
        List fieldRequestList = request.getSinkFieldList();
        if (CollectionUtils.isEmpty((Collection)fieldRequestList)) {
            return;
        }
        if (onlyAdd.booleanValue()) {
            List existsFieldList = this.sinkFieldMapper.selectBySinkId(sinkId);
            if (existsFieldList.size() > fieldRequestList.size()) {
                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
            }
            for (int i = 0; i < existsFieldList.size(); ++i) {
                if (((StreamSinkFieldEntity)existsFieldList.get(i)).getFieldName().equals(((SinkField)fieldRequestList.get(i)).getFieldName())) continue;
                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
            }
        }
        this.sinkFieldMapper.deleteAll(sinkId);
        this.saveFieldOpt(request);
        LOGGER.info("success to update field");
    }
}

