/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.sort.util;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
import org.apache.inlong.manager.common.util.StreamParseUtils;
import org.apache.inlong.manager.service.sort.util.FieldInfoUtils;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.LogicOperator;
import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeRelationUtils {
    private static final Logger log = LoggerFactory.getLogger(NodeRelationUtils.class);

    public static List<NodeRelation> createNodeRelationsForStream(InlongStreamInfo streamInfo) {
        String tempView = streamInfo.getExtParams();
        if (StringUtils.isEmpty((CharSequence)tempView)) {
            log.warn("stream node relation is empty for {}", (Object)streamInfo);
            return Lists.newArrayList();
        }
        StreamPipeline pipeline = StreamParseUtils.parseStreamPipeline((String)streamInfo.getExtParams(), (String)streamInfo.getInlongStreamId());
        return pipeline.getPipeline().stream().map(nodeRelation -> new NodeRelation((List)Lists.newArrayList((Iterable)nodeRelation.getInputNodes()), (List)Lists.newArrayList((Iterable)nodeRelation.getOutputNodes()))).collect(Collectors.toList());
    }

    public static void optimizeNodeRelation(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
        if (CollectionUtils.isEmpty(transformResponses)) {
            return;
        }
        Map<String, TransformDefinition> transformTypeMap = transformResponses.stream().collect(Collectors.toMap(TransformResponse::getTransformName, transformResponse -> {
            TransformType transformType = TransformType.forType((String)transformResponse.getTransformType());
            return StreamParseUtils.parseTransformDefinition((String)transformResponse.getTransformDefinition(), (TransformType)transformType);
        }));
        List nodes = streamInfo.getNodes();
        Map<String, TransformNode> joinNodes = nodes.stream().filter(node -> node instanceof TransformNode).map(node -> (TransformNode)node).filter(transformNode -> {
            TransformDefinition transformDefinition = (TransformDefinition)transformTypeMap.get(transformNode.getName());
            return transformDefinition.getTransformType() == TransformType.JOINER;
        }).collect(Collectors.toMap(TransformNode::getName, transformNode -> transformNode));
        List relations = streamInfo.getRelations();
        ListIterator shipIterator = relations.listIterator();
        ArrayList joinRelations = Lists.newArrayList();
        while (shipIterator.hasNext()) {
            String nodeName;
            NodeRelation relation = (NodeRelation)shipIterator.next();
            List outputs = relation.getOutputs();
            if (outputs.size() != 1 || joinNodes.get(nodeName = (String)outputs.get(0)) == null) continue;
            TransformDefinition transformDefinition = transformTypeMap.get(nodeName);
            TransformNode transformNode2 = joinNodes.get(nodeName);
            joinRelations.add(NodeRelationUtils.getNodeRelation((JoinerDefinition)transformDefinition, relation));
            shipIterator.remove();
        }
        relations.addAll(joinRelations);
    }

    private static NodeRelation getNodeRelation(JoinerDefinition joinerDefinition, NodeRelation nodeRelation) {
        JoinerDefinition.JoinMode joinMode = joinerDefinition.getJoinMode();
        String leftNode = NodeRelationUtils.getNodeName(joinerDefinition.getLeftNode());
        String rightNode = NodeRelationUtils.getNodeName(joinerDefinition.getRightNode());
        ArrayList preNodes = Lists.newArrayList((Object[])new String[]{leftNode, rightNode});
        List leftJoinFields = joinerDefinition.getLeftJoinFields();
        List rightJoinFields = joinerDefinition.getRightJoinFields();
        ArrayList filterFunctions = Lists.newArrayList();
        for (int index = 0; index < leftJoinFields.size(); ++index) {
            StreamField leftField = (StreamField)leftJoinFields.get(index);
            StreamField rightField = (StreamField)rightJoinFields.get(index);
            Object operator = index != leftJoinFields.size() - 1 ? AndOperator.getInstance() : EmptyOperator.getInstance();
            filterFunctions.add(NodeRelationUtils.createFilterFunction(leftField, rightField, (LogicOperator)operator));
        }
        HashMap joinConditions = Maps.newHashMap();
        joinConditions.put(rightNode, filterFunctions);
        switch (joinMode) {
            case LEFT_JOIN: {
                return new LeftOuterJoinNodeRelation((List)preNodes, nodeRelation.getOutputs(), (Map)joinConditions);
            }
            case INNER_JOIN: {
                return new InnerJoinNodeRelation((List)preNodes, nodeRelation.getOutputs(), (Map)joinConditions);
            }
            case RIGHT_JOIN: {
                return new RightOuterJoinNodeRelation((List)preNodes, nodeRelation.getOutputs(), (Map)joinConditions);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
    }

    private static SingleValueFilterFunction createFilterFunction(StreamField leftField, StreamField rightField, LogicOperator operator) {
        FieldInfo sourceField = new FieldInfo(leftField.getOriginFieldName(), leftField.getOriginNodeName(), FieldInfoUtils.convertFieldFormat(leftField.getFieldType(), leftField.getFieldFormat()));
        FieldInfo targetField = new FieldInfo(rightField.getOriginFieldName(), rightField.getOriginNodeName(), FieldInfoUtils.convertFieldFormat(rightField.getFieldType(), rightField.getFieldFormat()));
        return new SingleValueFilterFunction(operator, (FunctionParam)sourceField, (SingleValueCompareOperator)EqualOperator.getInstance(), (FunctionParam)targetField);
    }

    private static String getNodeName(StreamNode node) {
        if (node instanceof StreamSource) {
            return ((StreamSource)node).getSourceName();
        }
        if (node instanceof StreamSink) {
            return ((StreamSink)node).getSinkName();
        }
        return ((StreamTransform)node).getTransformName();
    }
}

