package org.apache.inlong.manager.service.resource.sort;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.class */
public class DefaultSortConfigOperator implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Value("${metrics.audit.proxy.hosts:127.0.0.1}")
    private String auditHost;

    @Autowired
    private StreamSourceService sourceService;

    @Autowired
    private StreamTransformService transformService;

    @Autowired
    private StreamSinkService sinkService;

    @Autowired
    private AuditService auditService;

    @Override // org.apache.inlong.manager.service.resource.sort.SortConfigOperator
    public Boolean accept(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (SinkType.SORT_FLINK_SINK.contains(it.next())) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.inlong.manager.service.resource.sort.SortConfigOperator
    public void buildConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, boolean z) throws Exception {
        if (z) {
            LOGGER.warn("no need to build sort config for stream process when disable zk");
            return;
        }
        if (inlongGroupInfo == null || inlongStreamInfo == null) {
            LOGGER.warn("no need to build sort config as the group is null or stream is empty when disable zk");
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (StreamSink streamSink : inlongStreamInfo.getSinkList()) {
            if (SinkType.SORT_FLINK_SINK.contains(streamSink.getSinkType())) {
                arrayList.add(streamSink);
            }
        }
        if (CollectionUtils.isEmpty(arrayList)) {
            return;
        }
        String writeValueAsString = OBJECT_MAPPER.writeValueAsString(getGroupInfo(inlongGroupInfo, inlongStreamInfo, arrayList));
        addToStreamExt(inlongStreamInfo, writeValueAsString);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to build sort config, isStream={}, dataflow={}", Boolean.valueOf(z), writeValueAsString);
        }
    }

    private GroupInfo getGroupInfo(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, List<StreamSink> list) {
        List<NodeRelation> createNodeRelations;
        String inlongStreamId = inlongStreamInfo.getInlongStreamId();
        Map<String, List<StreamSource>> sourcesMap = this.sourceService.getSourcesMap(inlongGroupInfo, Collections.singletonList(inlongStreamInfo));
        Map map = (Map) this.transformService.listTransform(inlongGroupInfo.getInlongGroupId(), inlongStreamId).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongStreamId();
        }, HashMap::new, Collectors.toCollection(ArrayList::new)));
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        inlongStreamInfo.getSourceList().forEach(streamSource -> {
            parseConstantFieldMap(streamSource.getSourceName(), streamSource.getFieldList(), hashMap);
        });
        List<TransformResponse> list2 = (List) map.get(inlongStreamId);
        if (CollectionUtils.isNotEmpty(list2)) {
            list2.forEach(transformResponse -> {
                parseConstantFieldMap(transformResponse.getTransformName(), transformResponse.getFieldList(), hashMap);
            });
        }
        List<StreamSource> list3 = sourcesMap.get(inlongStreamId);
        for (StreamSink streamSink : list) {
            CommonBeanUtils.copyProperties(inlongStreamInfo, streamSink, true);
            addAuditId(streamSink.getProperties(), streamSink.getSinkType(), true);
        }
        Iterator<StreamSource> it = list3.iterator();
        while (it.hasNext()) {
            it.next().setFieldList(inlongStreamInfo.getFieldList());
        }
        if (InlongConstants.STANDARD_MODE.equals(inlongGroupInfo.getInlongGroupMode())) {
            if (CollectionUtils.isNotEmpty(list2)) {
                createNodeRelations = NodeRelationUtils.createNodeRelations(inlongStreamInfo);
                String sourceName = list3.get(0).getSourceName();
                Set<String> inputNodeNames = getInputNodeNames(list3, list2);
                adjustTransformField(list2, inputNodeNames, sourceName);
                adjustNodeRelations(createNodeRelations, inputNodeNames, sourceName);
            } else {
                createNodeRelations = NodeRelationUtils.createNodeRelations(list3, list);
            }
            for (int i = 0; i < list3.size(); i++) {
                addAuditId(list3.get(i).getProperties(), list.get(0).getSinkType(), false);
            }
        } else {
            if (CollectionUtils.isNotEmpty(list2)) {
                List list4 = (List) list3.stream().map((v0) -> {
                    return v0.getSourceName();
                }).collect(Collectors.toList());
                List list5 = (List) list2.stream().map((v0) -> {
                    return v0.getTransformName();
                }).collect(Collectors.toList());
                createNodeRelations = Arrays.asList(NodeRelationUtils.createNodeRelation(list4, list5), NodeRelationUtils.createNodeRelation(list5, (List) list.stream().map((v0) -> {
                    return v0.getSinkName();
                }).collect(Collectors.toList())));
            } else {
                createNodeRelations = NodeRelationUtils.createNodeRelations(list3, list);
            }
            for (StreamSource streamSource2 : list3) {
                addAuditId(streamSource2.getProperties(), streamSource2.getSourceType(), false);
            }
        }
        StreamInfo streamInfo = new StreamInfo(inlongStreamId, createNodes(list3, list2, list, hashMap), createNodeRelations);
        arrayList.add(streamInfo);
        NodeRelationUtils.optimizeNodeRelation(streamInfo, list2);
        return new GroupInfo(inlongGroupInfo.getInlongGroupId(), arrayList);
    }

    private Set<String> getInputNodeNames(List<StreamSource> list, List<TransformResponse> list2) {
        HashSet hashSet = new HashSet();
        if (CollectionUtils.isNotEmpty(list)) {
            hashSet.addAll((Collection) list.stream().map((v0) -> {
                return v0.getSourceName();
            }).collect(Collectors.toSet()));
        }
        if (CollectionUtils.isNotEmpty(list2)) {
            hashSet.addAll((Collection) list2.stream().map((v0) -> {
                return v0.getTransformName();
            }).collect(Collectors.toSet()));
        }
        return hashSet;
    }

    private void adjustTransformField(List<TransformResponse> list, Set<String> set, String str) {
        Iterator<TransformResponse> it = list.iterator();
        while (it.hasNext()) {
            for (StreamField streamField : it.next().getFieldList()) {
                if (!set.contains(streamField.getOriginNodeName())) {
                    streamField.setOriginNodeName(str);
                }
            }
        }
    }

    private void adjustNodeRelations(List<NodeRelation> list, Set<String> set, String str) {
        Iterator<NodeRelation> it = list.iterator();
        while (it.hasNext()) {
            ListIterator listIterator = it.next().getInputs().listIterator();
            while (listIterator.hasNext()) {
                if (!set.contains(listIterator.next())) {
                    listIterator.set(str);
                }
            }
        }
    }

    private List<Node> createNodes(List<StreamSource> list, List<TransformResponse> list2, List<StreamSink> list3, Map<String, StreamField> map) {
        ArrayList arrayList = new ArrayList();
        if (Objects.equals(Integer.valueOf(list.size()), Integer.valueOf(list3.size())) && Objects.equals(Integer.valueOf(list.size()), 1)) {
            return NodeFactory.addBuiltInField(list.get(0), list3.get(0), list2, map);
        }
        List createTransformNodes = TransformNodeUtils.createTransformNodes(list2, map);
        arrayList.addAll(NodeFactory.createExtractNodes(list));
        arrayList.addAll(createTransformNodes);
        arrayList.addAll(NodeFactory.createLoadNodes(list3, map));
        return arrayList;
    }

    private void parseConstantFieldMap(String str, List<StreamField> list, Map<String, StreamField> map) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        for (StreamField streamField : list) {
            if (streamField.getFieldValue() != null) {
                map.put(String.format("%s-%s", str, streamField.getFieldName()), streamField);
            }
        }
    }

    private void addToStreamExt(InlongStreamInfo inlongStreamInfo, String str) {
        if (inlongStreamInfo.getExtList() == null) {
            inlongStreamInfo.setExtList(new ArrayList());
        }
        InlongStreamExtInfo inlongStreamExtInfo = new InlongStreamExtInfo();
        inlongStreamExtInfo.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
        inlongStreamExtInfo.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
        inlongStreamExtInfo.setKeyName("dataflow");
        inlongStreamExtInfo.setKeyValue(str);
        inlongStreamInfo.getExtList().removeIf(inlongStreamExtInfo2 -> {
            return inlongStreamExtInfo.getKeyName().equals(inlongStreamExtInfo2.getKeyName());
        });
        inlongStreamInfo.getExtList().add(inlongStreamExtInfo);
    }

    private void addAuditId(Map<String, Object> map, String str, boolean z) {
        try {
            map.putIfAbsent("metrics.audit.key", this.auditService.getAuditId(str, z));
            map.putIfAbsent("metrics.audit.proxy.hosts", this.auditHost);
        } catch (Exception e) {
            LOGGER.error("Current type ={} is not set auditId", str);
        }
    }
}
