package org.apache.samza.execution;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.system.StreamSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/execution/JobGraph.class */
public class JobGraph implements ExecutionPlan {
    private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
    private final Config config;
    private final Map<String, JobNode> nodes = new HashMap();
    private final Map<String, StreamEdge> edges = new HashMap();
    private final Set<StreamEdge> sources = new HashSet();
    private final Set<StreamEdge> sinks = new HashSet();
    private final Set<StreamEdge> intermediateStreams = new HashSet();
    private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobGraph(Config config) {
        this.config = config;
    }

    @Override // org.apache.samza.execution.ExecutionPlan
    public List<JobConfig> getJobConfigs() {
        String str = "";
        try {
            str = getPlanAsJson();
        } catch (Exception e) {
            log.warn("Failed to generate plan JSON", e);
        }
        String str2 = str;
        return (List) getJobNodes().stream().map(jobNode -> {
            return jobNode.generateConfig(str2);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.samza.execution.ExecutionPlan
    public List<StreamSpec> getIntermediateStreams() {
        return (List) getIntermediateStreamEdges().stream().map(streamEdge -> {
            return streamEdge.getStreamSpec();
        }).collect(Collectors.toList());
    }

    @Override // org.apache.samza.execution.ExecutionPlan
    public String getPlanAsJson() throws Exception {
        return this.jsonGenerator.toJson(this);
    }

    public ApplicationConfig getApplicationConfig() {
        return new ApplicationConfig(this.config);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSource(StreamSpec streamSpec, JobNode jobNode) {
        StreamEdge orCreateStreamEdge = getOrCreateStreamEdge(streamSpec);
        orCreateStreamEdge.addTargetNode(jobNode);
        jobNode.addInEdge(orCreateStreamEdge);
        this.sources.add(orCreateStreamEdge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSink(StreamSpec streamSpec, JobNode jobNode) {
        StreamEdge orCreateStreamEdge = getOrCreateStreamEdge(streamSpec);
        orCreateStreamEdge.addSourceNode(jobNode);
        jobNode.addOutEdge(orCreateStreamEdge);
        this.sinks.add(orCreateStreamEdge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addIntermediateStream(StreamSpec streamSpec, JobNode jobNode, JobNode jobNode2) {
        StreamEdge orCreateStreamEdge = getOrCreateStreamEdge(streamSpec);
        orCreateStreamEdge.addSourceNode(jobNode);
        orCreateStreamEdge.addTargetNode(jobNode2);
        jobNode.addOutEdge(orCreateStreamEdge);
        jobNode2.addInEdge(orCreateStreamEdge);
        this.intermediateStreams.add(orCreateStreamEdge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobNode getOrCreateJobNode(String str, String str2, StreamGraphImpl streamGraphImpl) {
        String createId = JobNode.createId(str, str2);
        JobNode jobNode = this.nodes.get(createId);
        if (jobNode == null) {
            jobNode = new JobNode(str, str2, streamGraphImpl, this.config);
            this.nodes.put(createId, jobNode);
        }
        return jobNode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
        String id = streamSpec.getId();
        StreamEdge streamEdge = this.edges.get(id);
        if (streamEdge == null) {
            streamEdge = new StreamEdge(streamSpec);
            this.edges.put(id, streamEdge);
        }
        return streamEdge;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<JobNode> getJobNodes() {
        return Collections.unmodifiableList(topologicalSort());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<StreamEdge> getSources() {
        return Collections.unmodifiableSet(this.sources);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<StreamEdge> getSinks() {
        return Collections.unmodifiableSet(this.sinks);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<StreamEdge> getIntermediateStreamEdges() {
        return Collections.unmodifiableSet(this.intermediateStreams);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validate() {
        validateSources();
        validateSinks();
        validateInternalStreams();
        validateReachability();
    }

    private void validateSources() {
        this.sources.forEach(streamEdge -> {
            if (!streamEdge.getSourceNodes().isEmpty()) {
                throw new IllegalArgumentException(String.format("Source stream %s should not have producers.", streamEdge.getFormattedSystemStream()));
            }
            if (streamEdge.getTargetNodes().isEmpty()) {
                throw new IllegalArgumentException(String.format("Source stream %s should have consumers.", streamEdge.getFormattedSystemStream()));
            }
        });
    }

    private void validateSinks() {
        this.sinks.forEach(streamEdge -> {
            if (!streamEdge.getTargetNodes().isEmpty()) {
                throw new IllegalArgumentException(String.format("Sink stream %s should not have consumers", streamEdge.getFormattedSystemStream()));
            }
            if (streamEdge.getSourceNodes().isEmpty()) {
                throw new IllegalArgumentException(String.format("Sink stream %s should have producers", streamEdge.getFormattedSystemStream()));
            }
        });
    }

    private void validateInternalStreams() {
        HashSet hashSet = new HashSet(this.edges.values());
        hashSet.removeAll(this.sources);
        hashSet.removeAll(this.sinks);
        hashSet.forEach(streamEdge -> {
            if (streamEdge.getSourceNodes().isEmpty() || streamEdge.getTargetNodes().isEmpty()) {
                throw new IllegalArgumentException(String.format("Internal stream %s should have both producers and consumers", streamEdge.getFormattedSystemStream()));
            }
        });
    }

    private void validateReachability() {
        Set<JobNode> findReachable = findReachable();
        if (findReachable.size() != this.nodes.size()) {
            HashSet hashSet = new HashSet(this.nodes.values());
            hashSet.removeAll(findReachable);
            throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.", String.join(", ", (Iterable<? extends CharSequence>) hashSet.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()))));
        }
    }

    Set<JobNode> findReachable() {
        ArrayDeque arrayDeque = new ArrayDeque();
        HashSet hashSet = new HashSet();
        this.sources.forEach(streamEdge -> {
            List<JobNode> targetNodes = streamEdge.getTargetNodes();
            arrayDeque.addAll(targetNodes);
            hashSet.addAll(targetNodes);
        });
        while (!arrayDeque.isEmpty()) {
            ((JobNode) arrayDeque.poll()).getOutEdges().stream().flatMap(streamEdge2 -> {
                return streamEdge2.getTargetNodes().stream();
            }).forEach(jobNode -> {
                if (hashSet.contains(jobNode)) {
                    return;
                }
                hashSet.add(jobNode);
                arrayDeque.offer(jobNode);
            });
        }
        return hashSet;
    }

    List<JobNode> topologicalSort() {
        Collection<JobNode> values = this.nodes.values();
        if (values.size() == 1) {
            return new ArrayList(values);
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        values.forEach(jobNode -> {
            String id = jobNode.getId();
            long count = jobNode.getInEdges().stream().filter(streamEdge -> {
                return !this.sources.contains(streamEdge);
            }).count();
            hashMap.put(id, Long.valueOf(count));
            if (count == 0) {
                arrayDeque.add(jobNode);
                hashSet.add(jobNode);
            }
        });
        ArrayList arrayList = new ArrayList();
        HashSet<JobNode> hashSet2 = new HashSet();
        while (arrayList.size() < values.size()) {
            while (!arrayDeque.isEmpty()) {
                JobNode jobNode2 = (JobNode) arrayDeque.poll();
                arrayList.add(jobNode2);
                jobNode2.getOutEdges().stream().flatMap(streamEdge -> {
                    return streamEdge.getTargetNodes().stream();
                }).forEach(jobNode3 -> {
                    String id = jobNode3.getId();
                    Long valueOf = Long.valueOf(((Long) hashMap.get(id)).longValue() - 1);
                    hashMap.put(id, valueOf);
                    if (valueOf.longValue() == 0 && !hashSet.contains(jobNode3)) {
                        arrayDeque.add(jobNode3);
                        hashSet.add(jobNode3);
                    }
                    hashSet2.add(jobNode3);
                });
            }
            if (arrayList.size() < values.size()) {
                hashSet2.removeAll(arrayList);
                if (hashSet2.isEmpty()) {
                    JobNode jobNode4 = (JobNode) this.sources.stream().flatMap(streamEdge2 -> {
                        return streamEdge2.getTargetNodes().stream();
                    }).filter(jobNode5 -> {
                        return !hashSet.contains(jobNode5);
                    }).findAny().get();
                    arrayDeque.add(jobNode4);
                    hashSet.add(jobNode4);
                } else {
                    long j = Long.MAX_VALUE;
                    JobNode jobNode6 = null;
                    for (JobNode jobNode7 : hashSet2) {
                        Long l = (Long) hashMap.get(jobNode7.getId());
                        if (l.longValue() < j) {
                            j = l.longValue();
                            jobNode6 = jobNode7;
                        }
                    }
                    arrayDeque.add(jobNode6);
                    hashSet.add(jobNode6);
                }
            }
        }
        return arrayList;
    }
}
