package org.apache.samza.execution;

import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.util.OperatorJsonUtils;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator.class */
public class JobGraphJsonGenerator {
    Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$JobGraphJson.class */
    public static final class JobGraphJson {

        @JsonProperty("jobs")
        List<JobNodeJson> jobs;

        @JsonProperty("sourceStreams")
        Map<String, StreamEdgeJson> sourceStreams;

        @JsonProperty("sinkStreams")
        Map<String, StreamEdgeJson> sinkStreams;

        @JsonProperty("intermediateStreams")
        Map<String, StreamEdgeJson> intermediateStreams;

        @JsonProperty("applicationName")
        String applicationName;

        @JsonProperty("applicationId")
        String applicationId;

        JobGraphJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$JobNodeJson.class */
    public static final class JobNodeJson {

        @JsonProperty("jobName")
        String jobName;

        @JsonProperty("jobId")
        String jobId;

        @JsonProperty("operatorGraph")
        OperatorGraphJson operatorGraph;

        JobNodeJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$OperatorGraphJson.class */
    public static final class OperatorGraphJson {

        @JsonProperty("inputStreams")
        List<StreamJson> inputStreams;

        @JsonProperty("outputStreams")
        List<StreamJson> outputStreams;

        @JsonProperty("operators")
        Map<Integer, Map<String, Object>> operators = new HashMap();

        @JsonProperty("canonicalOpIds")
        Map<Integer, String> canonicalOpIds = new HashMap();

        OperatorGraphJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$StreamEdgeJson.class */
    public static final class StreamEdgeJson {

        @JsonProperty("streamSpec")
        StreamSpecJson streamSpec;

        @JsonProperty("sourceJobs")
        List<String> sourceJobs;

        @JsonProperty("targetJobs")
        List<String> targetJobs;

        StreamEdgeJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$StreamJson.class */
    public static final class StreamJson {

        @JsonProperty("streamId")
        String streamId;

        @JsonProperty("nextOperatorIds")
        Set<Integer> nextOperatorIds = new HashSet();

        StreamJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/JobGraphJsonGenerator$StreamSpecJson.class */
    public static final class StreamSpecJson {

        @JsonProperty("id")
        String id;

        @JsonProperty("systemName")
        String systemName;

        @JsonProperty("physicalName")
        String physicalName;

        @JsonProperty("partitionCount")
        int partitionCount;

        StreamSpecJson() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String toJson(JobGraph jobGraph) throws Exception {
        JobGraphJson jobGraphJson = new JobGraphJson();
        ApplicationConfig applicationConfig = jobGraph.getApplicationConfig();
        jobGraphJson.applicationName = applicationConfig.getAppName();
        jobGraphJson.applicationId = applicationConfig.getAppId();
        jobGraphJson.sourceStreams = new HashMap();
        jobGraphJson.sinkStreams = new HashMap();
        jobGraphJson.intermediateStreams = new HashMap();
        jobGraph.getSources().forEach(streamEdge -> {
            buildStreamEdgeJson(streamEdge, jobGraphJson.sourceStreams);
        });
        jobGraph.getSinks().forEach(streamEdge2 -> {
            buildStreamEdgeJson(streamEdge2, jobGraphJson.sinkStreams);
        });
        jobGraph.getIntermediateStreamEdges().forEach(streamEdge3 -> {
            buildStreamEdgeJson(streamEdge3, jobGraphJson.intermediateStreams);
        });
        jobGraphJson.jobs = (List) jobGraph.getJobNodes().stream().map(jobNode -> {
            return buildJobNodeJson(jobNode);
        }).collect(Collectors.toList());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectMapper().writeValue(byteArrayOutputStream, jobGraphJson);
        return new String(byteArrayOutputStream.toByteArray());
    }

    private JobNodeJson buildJobNodeJson(JobNode jobNode) {
        JobNodeJson jobNodeJson = new JobNodeJson();
        jobNodeJson.jobName = jobNode.getJobName();
        jobNodeJson.jobId = jobNode.getJobId();
        jobNodeJson.operatorGraph = buildOperatorGraphJson(jobNode);
        return jobNodeJson;
    }

    private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
        OperatorGraphJson operatorGraphJson = new OperatorGraphJson();
        operatorGraphJson.inputStreams = new ArrayList();
        jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, inputStreamInternal) -> {
            StreamJson streamJson = new StreamJson();
            operatorGraphJson.inputStreams.add(streamJson);
            streamJson.streamId = streamSpec.getId();
            streamJson.nextOperatorIds = (Set) ((MessageStreamImpl) inputStreamInternal).getRegisteredOperatorSpecs().stream().map((v0) -> {
                return v0.getOpId();
            }).collect(Collectors.toSet());
            updateOperatorGraphJson((MessageStreamImpl) inputStreamInternal, operatorGraphJson);
            Iterator it = this.outputStreamToOpIds.asMap().entrySet().iterator();
            while (it.hasNext()) {
                ArrayList arrayList = new ArrayList((Collection) ((Map.Entry) it.next()).getValue());
                Collections.sort(arrayList);
                String join = Joiner.on(',').join(arrayList);
                arrayList.stream().forEach(num -> {
                    operatorGraphJson.canonicalOpIds.put(num, join);
                });
            }
        });
        operatorGraphJson.outputStreams = new ArrayList();
        jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec2 -> {
            StreamJson streamJson = new StreamJson();
            streamJson.streamId = streamSpec2.getId();
            operatorGraphJson.outputStreams.add(streamJson);
        });
        return operatorGraphJson;
    }

    private void updateOperatorGraphJson(MessageStreamImpl messageStreamImpl, OperatorGraphJson operatorGraphJson) {
        messageStreamImpl.getRegisteredOperatorSpecs().forEach(operatorSpec -> {
            operatorGraphJson.operators.put(Integer.valueOf(operatorSpec.getOpId()), OperatorJsonUtils.operatorToMap(operatorSpec));
            if (operatorSpec.getOpCode() == OperatorSpec.OpCode.JOIN || operatorSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
                this.outputStreamToOpIds.put(operatorSpec.getNextStream(), Integer.valueOf(operatorSpec.getOpId()));
            }
            if (operatorSpec.getNextStream() != null) {
                updateOperatorGraphJson(operatorSpec.getNextStream(), operatorGraphJson);
            }
        });
    }

    private StreamEdgeJson buildStreamEdgeJson(StreamEdge streamEdge, Map<String, StreamEdgeJson> map) {
        String id = streamEdge.getStreamSpec().getId();
        StreamEdgeJson streamEdgeJson = map.get(id);
        if (streamEdgeJson == null) {
            streamEdgeJson = new StreamEdgeJson();
            StreamSpecJson streamSpecJson = new StreamSpecJson();
            streamSpecJson.id = id;
            streamSpecJson.systemName = streamEdge.getStreamSpec().getSystemName();
            streamSpecJson.physicalName = streamEdge.getStreamSpec().getPhysicalName();
            streamSpecJson.partitionCount = streamEdge.getPartitionCount();
            streamEdgeJson.streamSpec = streamSpecJson;
            ArrayList arrayList = new ArrayList();
            streamEdge.getSourceNodes().forEach(jobNode -> {
                arrayList.add(jobNode.getJobName());
            });
            streamEdgeJson.sourceJobs = arrayList;
            ArrayList arrayList2 = new ArrayList();
            streamEdge.getTargetNodes().forEach(jobNode2 -> {
                arrayList2.add(jobNode2.getJobName());
            });
            streamEdgeJson.targetJobs = arrayList2;
            map.put(id, streamEdgeJson);
        }
        return streamEdgeJson;
    }
}
