/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sort;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.datatype.DataTypeOperator;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DefaultSortConfigOperator
implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;
    @Autowired
    public DataTypeOperatorFactory dataTypeOperatorFactory;
    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;
    @Autowired
    private InlongClusterEntityMapper clusterMapper;
    @Autowired
    private SortConfigEntityMapper sortConfigEntityMapper;
    @Autowired
    private InlongGroupEntityMapper groupEntityMapper;
    @Autowired
    private SinkOperatorFactory operatorFactory;

    @Override
    public Boolean accept(List<String> sinkTypeList) {
        for (String sinkType : sinkTypeList) {
            if (!SinkType.SORT_STANDALONE_SINK.contains(sinkType)) continue;
            return true;
        }
        return false;
    }

    @Override
    public void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, boolean isStream) throws Exception {
        if (groupInfo == null || streamInfo == null) {
            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config");
            return;
        }
        if (isStream) {
            LOGGER.info("no need to build all sort config since the workflow is not stream level, groupId={}", (Object)groupInfo.getInlongGroupId());
            return;
        }
        ArrayList<StreamSink> sinkList = new ArrayList<StreamSink>();
        for (StreamSink sink : streamInfo.getSinkList()) {
            if (!SinkType.SORT_STANDALONE_SINK.contains(sink.getSinkType())) continue;
            sinkList.add(sink);
        }
        if (CollectionUtils.isEmpty(sinkList)) {
            return;
        }
        InlongGroupEntity groupEntity = this.groupEntityMapper.selectByGroupId(groupInfo.getInlongGroupId());
        Preconditions.expectTrue((boolean)"PULSAR".equals(groupEntity.getMqType()), (String)"standalone only support pulsar");
        for (StreamSink sink : sinkList) {
            this.saveDataFlow(groupInfo, streamInfo, sink);
        }
    }

    private void saveDataFlow(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) {
        try {
            DataFlowConfig dataFlowConfig = this.getDataFlowConfig(groupInfo, streamInfo, sink);
            SortConfigEntity sortConfigEntity = this.sortConfigEntityMapper.selectBySinkId(sink.getId());
            String clusterTags = groupInfo.getInlongClusterTag();
            ObjectMapper objectMapper = new ObjectMapper();
            if (sortConfigEntity == null) {
                dataFlowConfig.setVersion(Integer.valueOf(0));
                sortConfigEntity = (SortConfigEntity)CommonBeanUtils.copyProperties((Object)sink, SortConfigEntity::new);
                sortConfigEntity.setId(null);
                if (StringUtils.isBlank((CharSequence)sortConfigEntity.getSortTaskName())) {
                    sortConfigEntity.setSortTaskName("DEFAULT_TASK");
                }
                sortConfigEntity.setSinkId(sink.getId());
                sortConfigEntity.setConfigParams(objectMapper.writeValueAsString((Object)dataFlowConfig));
                sortConfigEntity.setInlongClusterTag(clusterTags);
                this.sortConfigEntityMapper.insert(sortConfigEntity);
            } else {
                dataFlowConfig.setVersion(Integer.valueOf(sortConfigEntity.getVersion() + 1));
                sortConfigEntity.setConfigParams(objectMapper.writeValueAsString((Object)dataFlowConfig));
                sortConfigEntity.setInlongClusterTag(clusterTags);
                if (StringUtils.isBlank((CharSequence)sortConfigEntity.getSortTaskName())) {
                    sortConfigEntity.setSortTaskName("DEFAULT_TASK");
                }
                this.sortConfigEntityMapper.updateByIdSelective(sortConfigEntity);
            }
        }
        catch (Exception e) {
            LOGGER.error("failed to parse id params of groupId={}, streamId={} name={}, type={}", new Object[]{sink.getInlongGroupId(), sink.getInlongStreamId(), sink.getSinkName(), sink.getSinkType(), e});
        }
    }

    private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) {
        HashMap properties = new HashMap();
        return DataFlowConfig.builder().dataflowId(String.valueOf(sink.getId())).sourceConfig(this.getSourceConfig(groupInfo, streamInfo, sink)).auditTag(String.valueOf(sink.getId())).sinkConfig(this.getSinkConfig(groupInfo, streamInfo, sink)).inlongGroupId(groupInfo.getInlongGroupId()).inlongStreamId(streamInfo.getInlongStreamId()).properties(properties).build();
    }

    private SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) {
        StreamSinkOperator sinkOperator = this.operatorFactory.getInstance(sink.getSinkType());
        return sinkOperator.getSinkConfig(groupInfo, streamInfo, sink);
    }

    private SourceConfig getSourceConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) {
        List pulsarClusters = this.clusterMapper.selectByKey(groupInfo.getInlongClusterTag(), null, "PULSAR");
        if (CollectionUtils.isEmpty((Collection)pulsarClusters)) {
            throw new WorkflowListenerException("pulsar cluster not found for groupId=" + groupInfo.getInlongGroupId());
        }
        InlongClusterEntity pulsarCluster = (InlongClusterEntity)pulsarClusters.get(0);
        PulsarClusterDTO pulsarClusterDTO = PulsarClusterDTO.getFromJson((String)pulsarCluster.getExtParams());
        if (!(groupInfo instanceof InlongPulsarInfo)) {
            throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + groupInfo.getInlongGroupId());
        }
        InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)groupInfo;
        String tenant = pulsarInfo.getPulsarTenant();
        if (StringUtils.isBlank((CharSequence)tenant) && StringUtils.isNotBlank((CharSequence)pulsarClusterDTO.getPulsarTenant())) {
            tenant = pulsarClusterDTO.getPulsarTenant();
        }
        if (StringUtils.isBlank((CharSequence)tenant)) {
            tenant = "public";
        }
        String namespace = groupInfo.getMqResource();
        String topic = streamInfo.getMqResource();
        String fullTopic = "persistent://" + tenant + "/" + namespace + "/" + topic;
        String subs = String.format("%s_%s_%s_consumer_group", groupInfo.getInlongClusterTag(), topic, sink.getId());
        DeserializeOperator deserializeOperator = this.deserializeOperatorFactory.getInstance(MessageWrapType.forType((String)streamInfo.getWrapType()));
        DeserializationConfig deserializationConfig = deserializeOperator.getDeserializationConfig(streamInfo);
        DataTypeOperator dataTypeOperator = this.dataTypeOperatorFactory.getInstance(DataTypeEnum.forType((String)streamInfo.getDataType()));
        DataTypeConfig dataTypeConfig = dataTypeOperator.getDataTypeConfig(streamInfo);
        SourceConfig sourceConfig = new SourceConfig();
        List sinkFieldEntities = this.sinkFieldMapper.selectBySinkId(sink.getId());
        List fields = sinkFieldEntities.stream().map(v -> {
            FieldConfig fieldConfig = new FieldConfig();
            FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat((String)v.getSourceFieldType().toLowerCase());
            fieldConfig.setName(v.getSourceFieldName());
            fieldConfig.setFormatInfo(formatInfo);
            return fieldConfig;
        }).collect(Collectors.toList());
        sourceConfig.setFieldConfigs(fields);
        sourceConfig.setDeserializationConfig(deserializationConfig);
        sourceConfig.setDataTypeConfig(dataTypeConfig);
        sourceConfig.setEncodingType(streamInfo.getDataEncoding());
        sourceConfig.setTopic(fullTopic);
        sourceConfig.setSubscription(subs);
        return sourceConfig;
    }
}

