/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sort;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DefaultSortConfigOperator
implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    @Autowired
    private StreamSourceService sourceService;
    @Autowired
    private StreamTransformService transformService;
    @Autowired
    private StreamSinkService sinkService;

    @Override
    public Boolean accept(Integer enableZk) {
        return InlongConstants.DISABLE_ZK.equals(enableZk);
    }

    @Override
    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) throws Exception {
        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
            return;
        }
        GroupInfo configInfo = InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight()) ? this.getLightweightGroupInfo(groupInfo, streamInfos) : this.getStandardGroupInfo(groupInfo, streamInfos);
        String dataflow = OBJECT_MAPPER.writeValueAsString((Object)configInfo);
        if (isStream) {
            this.addToStreamExt(streamInfos, dataflow);
        } else {
            this.addToGroupExt(groupInfo, dataflow);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to build sort config, isStream={}, dataflow={}", (Object)isStream, (Object)dataflow);
        }
    }

    private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
        Map<String, List<StreamSource>> sourceMap = this.sourceService.getSourcesMap(groupInfo, streamInfoList);
        Map<String, List<StreamSink>> sinkMap = this.sinkService.getSinksMap(groupInfo, streamInfoList);
        List<TransformResponse> transformResponses = this.transformService.listTransform(groupInfo.getInlongGroupId(), null);
        Map transformMap = transformResponses.stream().collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        ArrayList<StreamInfo> sortStreamInfos = new ArrayList<StreamInfo>();
        for (InlongStreamInfo inlongStream : streamInfoList) {
            String streamId = inlongStream.getInlongStreamId();
            HashMap<String, StreamField> fieldMap = new HashMap<String, StreamField>();
            inlongStream.getSourceList().forEach(source -> this.parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
            List transformResponseList = (List)transformMap.get(streamId);
            if (CollectionUtils.isNotEmpty((Collection)transformResponseList)) {
                transformResponseList.forEach(trans -> this.parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
            }
            List<Node> nodes = this.createNodesWithTransform(sourceMap.get(streamId), transformResponseList, sinkMap.get(streamId), fieldMap);
            List relations = NodeRelationUtils.createNodeRelations((InlongStreamInfo)inlongStream);
            StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
            sortStreamInfos.add(streamInfo);
            NodeRelationUtils.optimizeNodeRelation((StreamInfo)streamInfo, (List)transformResponseList);
        }
        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
    }

    private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
        Map<String, List<StreamSource>> sourceMap = this.sourceService.getSourcesMap(groupInfo, streamInfoList);
        Map<String, List<StreamSink>> sinkMap = this.sinkService.getSinksMap(groupInfo, streamInfoList);
        ArrayList<StreamInfo> sortStreamInfos = new ArrayList<StreamInfo>();
        for (InlongStreamInfo inlongStream : streamInfoList) {
            String streamId = inlongStream.getInlongStreamId();
            HashMap<String, StreamField> fieldMap = new HashMap<String, StreamField>();
            inlongStream.getSourceList().forEach(source -> this.parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
            List<StreamSource> sources = sourceMap.get(streamId);
            List<StreamSink> sinks = sinkMap.get(streamId);
            StreamInfo sortStream = new StreamInfo(streamId, this.createNodesWithoutTransform(sources, sinks, fieldMap), NodeRelationUtils.createNodeRelations(sources, sinks));
            sortStreamInfos.add(sortStream);
        }
        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
    }

    private List<Node> createNodesWithoutTransform(List<StreamSource> sources, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
        ArrayList nodes = Lists.newArrayList();
        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
        return nodes;
    }

    private List<Node> createNodesWithTransform(List<StreamSource> sources, List<TransformResponse> transformResponses, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
        ArrayList nodes = Lists.newArrayList();
        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
        return nodes;
    }

    private void parseConstantFieldMap(String nodeId, List<StreamField> fields, Map<String, StreamField> constantFieldMap) {
        if (CollectionUtils.isEmpty(fields)) {
            return;
        }
        for (StreamField field : fields) {
            if (field.getFieldValue() == null) continue;
            constantFieldMap.put(String.format("%s-%s", nodeId, field.getFieldName()), field);
        }
    }

    private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
        if (groupInfo.getExtList() == null) {
            groupInfo.setExtList((List)Lists.newArrayList());
        }
        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
        extInfo.setInlongGroupId(groupInfo.getInlongGroupId());
        extInfo.setKeyName("dataflow");
        extInfo.setKeyValue(value);
        groupInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
        groupInfo.getExtList().add(extInfo);
    }

    private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) {
        streamInfos.forEach(streamInfo -> {
            if (streamInfo.getExtList() == null) {
                streamInfo.setExtList((List)Lists.newArrayList());
            }
            InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
            extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
            extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
            extInfo.setKeyName("dataflow");
            extInfo.setKeyValue(value);
            streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
            streamInfo.getExtList().add(extInfo);
        });
    }
}

