/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.resource.sort;

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.audit.entity.AuditComponent;
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SortFlinkConfigOperator
implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortFlinkConfigOperator.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    @Autowired
    private StreamSourceService sourceService;
    @Autowired
    private StreamTransformService transformService;
    @Autowired
    private AuditService auditService;
    @Autowired
    private NodeFactory nodeFactory;

    @Override
    public Boolean accept(List<String> sinkTypeList) {
        for (String sinkType : sinkTypeList) {
            if (!SinkType.SORT_FLINK_SINK.contains(sinkType)) continue;
            return true;
        }
        return false;
    }

    @Override
    public void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, boolean isStream) throws Exception {
        if (isStream) {
            LOGGER.warn("no need to build sort config for stream process when disable zk");
            return;
        }
        if (groupInfo == null || streamInfo == null) {
            LOGGER.warn("no need to build sort config as the group is null or stream is empty when disable zk");
            return;
        }
        ArrayList<StreamSink> sinkList = new ArrayList<StreamSink>();
        for (StreamSink sink : streamInfo.getSinkList()) {
            if (!SinkType.SORT_FLINK_SINK.contains(sink.getSinkType())) continue;
            sinkList.add(sink);
        }
        if (CollectionUtils.isEmpty(sinkList)) {
            return;
        }
        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfo, sinkList);
        String dataflow = OBJECT_MAPPER.writeValueAsString((Object)sortConfigInfo);
        this.addToStreamExt(streamInfo, dataflow);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to build sort config, isStream={}, dataflow={}", (Object)isStream, (Object)dataflow);
        }
    }

    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlongStreamInfo, List<StreamSink> sinkInfos) {
        List<NodeRelation> relations;
        String streamId = inlongStreamInfo.getInlongStreamId();
        Map<String, List<StreamSource>> sourceMap = this.sourceService.getSourcesMap(groupInfo, Collections.singletonList(inlongStreamInfo));
        List<TransformResponse> transformList = this.transformService.listTransform(groupInfo.getInlongGroupId(), streamId);
        Map transformMap = transformList.stream().collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));
        ArrayList<StreamInfo> sortStreamInfos = new ArrayList<StreamInfo>();
        HashMap<String, StreamField> fieldMap = new HashMap<String, StreamField>();
        inlongStreamInfo.getSourceList().forEach(source -> this.parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
        List transformResponses = (List)transformMap.get(streamId);
        if (CollectionUtils.isNotEmpty((Collection)transformResponses)) {
            transformResponses.forEach(trans -> this.parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
        }
        List<StreamSource> sources = sourceMap.get(streamId);
        for (StreamSink sinkInfo : sinkInfos) {
            CommonBeanUtils.copyProperties((Object)inlongStreamInfo, (Object)sinkInfo, (boolean)true);
            this.addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), IndicatorType.SEND_SUCCESS);
        }
        for (StreamSource source2 : sources) {
            source2.setFieldList(new ArrayList(inlongStreamInfo.getFieldList()));
        }
        if (InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) {
            if (CollectionUtils.isNotEmpty((Collection)transformResponses)) {
                relations = NodeRelationUtils.createNodeRelations((InlongStreamInfo)inlongStreamInfo);
                String mqNodeName = sources.get(0).getSourceName();
                Set<String> nodeNameSet = this.getInputNodeNames(sources, transformResponses);
                this.adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
                this.adjustNodeRelations(relations, nodeNameSet, mqNodeName);
            } else {
                relations = NodeRelationUtils.createNodeRelations(sources, sinkInfos);
            }
            for (int i = 0; i < sources.size(); ++i) {
                this.addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(), IndicatorType.RECEIVED_SUCCESS);
            }
        } else {
            if (CollectionUtils.isNotEmpty((Collection)transformResponses)) {
                List sourcesNames = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
                List transFormNames = transformResponses.stream().map(TransformResponse::getTransformName).collect(Collectors.toList());
                relations = Arrays.asList(NodeRelationUtils.createNodeRelation((List)sourcesNames, transFormNames), NodeRelationUtils.createNodeRelation(transFormNames, sinkInfos.stream().map(StreamSink::getSinkName).collect(Collectors.toList())));
            } else {
                relations = NodeRelationUtils.createNodeRelations(sources, sinkInfos);
            }
            for (StreamSource source3 : sources) {
                this.addAuditId(source3.getProperties(), source3.getSourceType(), IndicatorType.RECEIVED_SUCCESS);
            }
        }
        List<Node> nodes = this.createNodes(sources, transformResponses, sinkInfos, fieldMap);
        StreamInfo streamInfo = new StreamInfo(streamId, nodes, (List)relations);
        sortStreamInfos.add(streamInfo);
        NodeRelationUtils.optimizeNodeRelation((StreamInfo)streamInfo, (List)transformResponses);
        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
    }

    private Set<String> getInputNodeNames(List<StreamSource> sources, List<TransformResponse> transforms) {
        HashSet<String> result = new HashSet<String>();
        if (CollectionUtils.isNotEmpty(sources)) {
            result.addAll(sources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()));
        }
        if (CollectionUtils.isNotEmpty(transforms)) {
            result.addAll(transforms.stream().map(TransformResponse::getTransformName).collect(Collectors.toSet()));
        }
        return result;
    }

    private void adjustTransformField(List<TransformResponse> transforms, Set<String> nodeNameSet, String mqNodeName) {
        for (TransformResponse transform : transforms) {
            for (StreamField field : transform.getFieldList()) {
                if (nodeNameSet.contains(field.getOriginNodeName())) continue;
                field.setOriginNodeName(mqNodeName);
            }
        }
    }

    private void adjustNodeRelations(List<NodeRelation> relations, Set<String> nodeNameSet, String mqNodeName) {
        for (NodeRelation relation : relations) {
            ListIterator<String> iterator = relation.getInputs().listIterator();
            while (iterator.hasNext()) {
                if (nodeNameSet.contains(iterator.next())) continue;
                iterator.set(mqNodeName);
            }
        }
    }

    private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
        ArrayList<Node> nodes = new ArrayList<Node>();
        if (Objects.equals(sources.size(), sinks.size()) && Objects.equals(sources.size(), 1)) {
            return this.nodeFactory.addBuiltInField(sources.get(0), sinks.get(0), transformResponses, constantFieldMap);
        }
        List transformNodes = TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap);
        nodes.addAll(this.nodeFactory.createExtractNodes(sources));
        nodes.addAll(transformNodes);
        nodes.addAll(this.nodeFactory.createLoadNodes(sinks, constantFieldMap));
        return nodes;
    }

    private void parseConstantFieldMap(String nodeId, List<StreamField> fields, Map<String, StreamField> constantFieldMap) {
        if (CollectionUtils.isEmpty(fields)) {
            return;
        }
        for (StreamField field : fields) {
            if (field.getFieldValue() == null) continue;
            constantFieldMap.put(String.format("%s-%s", nodeId, field.getFieldName()), field);
        }
    }

    private void addToStreamExt(InlongStreamInfo streamInfo, String value) {
        if (streamInfo.getExtList() == null) {
            streamInfo.setExtList(new ArrayList());
        }
        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
        extInfo.setKeyName("dataflow");
        extInfo.setKeyValue(value);
        streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
        streamInfo.getExtList().add(extInfo);
    }

    private void addAuditId(Map<String, Object> properties, String type, IndicatorType indicatorType) {
        try {
            String auditId = this.auditService.getAuditId(type, indicatorType);
            List<AuditProxy> auditProxyList = this.auditService.getAuditProxy(AuditComponent.SORT.getComponent());
            properties.putIfAbsent("metrics.audit.key", auditId);
            properties.putIfAbsent("metrics.audit.proxy.hosts", Joiner.on((String)"&").join(auditProxyList));
        }
        catch (Exception e) {
            LOGGER.error("Current type ={} is not set auditId", (Object)type);
        }
    }
}

