/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sort;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DefaultSortConfigOperator
implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    @Autowired
    private StreamSourceService sourceService;
    @Autowired
    private StreamTransformService transformService;
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private AuditService auditService;

    @Override
    public Boolean accept(Integer enableZk) {
        return InlongConstants.DISABLE_ZK.equals(enableZk);
    }

    @Override
    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) throws Exception {
        if (isStream) {
            LOGGER.warn("no need to build sort config for stream process when disable zk");
            return;
        }
        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
            LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk");
            return;
        }
        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
        String dataflow = OBJECT_MAPPER.writeValueAsString((Object)sortConfigInfo);
        this.addToGroupExt(groupInfo, dataflow);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to build sort config, isStream={}, dataflow={}", (Object)isStream, (Object)dataflow);
        }
    }

    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
        Map<String, List<StreamSource>> sourceMap = this.sourceService.getSourcesMap(groupInfo, streamInfoList);
        Map<String, List<StreamSink>> sinkMap = this.sinkService.getSinksMap(groupInfo, streamInfoList);
        List<TransformResponse> transformList = this.transformService.listTransform(groupInfo.getInlongGroupId(), null);
        Map transformMap = transformList.stream().collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        ArrayList<StreamInfo> sortStreamInfos = new ArrayList<StreamInfo>();
        for (InlongStreamInfo inlongStream : streamInfoList) {
            List relations;
            String streamId = inlongStream.getInlongStreamId();
            HashMap<String, StreamField> fieldMap = new HashMap<String, StreamField>();
            inlongStream.getSourceList().forEach(source -> this.parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
            List transformResponses = (List)transformMap.get(streamId);
            if (CollectionUtils.isNotEmpty((Collection)transformResponses)) {
                transformResponses.forEach(trans -> this.parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
            }
            List<StreamSource> sources = sourceMap.get(streamId);
            List<StreamSink> sinks = sinkMap.get(streamId);
            ArrayList<String> auditIds = new ArrayList<String>();
            for (StreamSink sink : sinks) {
                auditIds.add(this.auditService.getAuditId(sink.getSinkType(), false));
            }
            for (StreamSource source2 : sources) {
                source2.setFieldList(inlongStream.getFieldList());
                Map properties = source2.getProperties();
                properties.putIfAbsent("metrics.audit.key", String.join((CharSequence)"&", auditIds));
            }
            if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
                if (CollectionUtils.isNotEmpty((Collection)transformResponses)) {
                    relations = NodeRelationUtils.createNodeRelations((InlongStreamInfo)inlongStream);
                    String mqNodeName = sources.get(0).getSourceName();
                    Set<String> nodeNameSet = this.getInputNodeNames(sources, transformResponses);
                    this.adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
                    this.adjustNodeRelations(relations, nodeNameSet, mqNodeName);
                } else {
                    relations = NodeRelationUtils.createNodeRelations(sources, sinks);
                }
            } else {
                relations = NodeRelationUtils.createNodeRelations(sources, sinks);
            }
            List<Node> nodes = this.createNodes(sources, transformResponses, sinks, fieldMap);
            StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
            sortStreamInfos.add(streamInfo);
            NodeRelationUtils.optimizeNodeRelation((StreamInfo)streamInfo, (List)transformResponses);
        }
        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
    }

    private Set<String> getInputNodeNames(List<StreamSource> sources, List<TransformResponse> transforms) {
        HashSet<String> result = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(sources)) {
            result.addAll(sources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()));
        }
        if (CollectionUtils.isNotEmpty(transforms)) {
            result.addAll(transforms.stream().map(TransformResponse::getTransformName).collect(Collectors.toSet()));
        }
        return result;
    }

    private void adjustTransformField(List<TransformResponse> transforms, Set<String> nodeNameSet, String mqNodeName) {
        for (TransformResponse transform : transforms) {
            for (StreamField field : transform.getFieldList()) {
                if (nodeNameSet.contains(field.getOriginNodeName())) continue;
                field.setOriginNodeName(mqNodeName);
            }
        }
    }

    private void adjustNodeRelations(List<NodeRelation> relations, Set<String> nodeNameSet, String mqNodeName) {
        for (NodeRelation relation : relations) {
            ListIterator<String> iterator = relation.getInputs().listIterator();
            while (iterator.hasNext()) {
                if (nodeNameSet.contains(iterator.next())) continue;
                iterator.set(mqNodeName);
            }
        }
    }

    private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
        ArrayList<Node> nodes = new ArrayList<Node>();
        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
        return nodes;
    }

    private void parseConstantFieldMap(String nodeId, List<StreamField> fields, Map<String, StreamField> constantFieldMap) {
        if (CollectionUtils.isEmpty(fields)) {
            return;
        }
        for (StreamField field : fields) {
            if (field.getFieldValue() == null) continue;
            constantFieldMap.put(String.format("%s-%s", nodeId, field.getFieldName()), field);
        }
    }

    private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
        if (groupInfo.getExtList() == null) {
            groupInfo.setExtList(new ArrayList());
        }
        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
        extInfo.setInlongGroupId(groupInfo.getInlongGroupId());
        extInfo.setKeyName("dataflow");
        extInfo.setKeyValue(value);
        groupInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
        groupInfo.getExtList().add(extInfo);
    }
}

