package org.apache.samza.execution;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.ListUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/execution/ExecutionPlanner.class */
public class ExecutionPlanner {
    private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
    private final Config config;
    private final StreamManager streamManager;
    private final StreamConfig streamConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/execution/ExecutionPlanner$StreamSet.class */
    public static class StreamSet {
        private final String setId;
        private final Set<StreamEdge> streamEdges;

        StreamSet(String str, Iterable<StreamEdge> iterable) {
            this.setId = str;
            this.streamEdges = ImmutableSet.copyOf(iterable);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Set<StreamEdge> getStreamEdges() {
            return Collections.unmodifiableSet(this.streamEdges);
        }

        String getSetId() {
            return this.setId;
        }
    }

    public ExecutionPlanner(Config config, StreamManager streamManager) {
        this.config = config;
        this.streamManager = streamManager;
        this.streamConfig = new StreamConfig(config);
    }

    public ExecutionPlan plan(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl) {
        validateConfig();
        JobGraph createJobGraph = createJobGraph(applicationDescriptorImpl);
        setInputAndOutputStreamPartitionCount(createJobGraph);
        List<StreamSet> groupJoinedStreams = groupJoinedStreams(createJobGraph);
        if (!createJobGraph.getIntermediateStreamEdges().isEmpty()) {
            new IntermediateStreamManager(this.config).calculatePartitions(createJobGraph, groupJoinedStreams);
        }
        groupJoinedStreams.forEach(ExecutionPlanner::validatePartitions);
        return createJobGraph;
    }

    private void validateConfig() {
        ApplicationConfig applicationConfig = new ApplicationConfig(this.config);
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(this.config);
        if (applicationConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterManagerConfig.getHostAffinityEnabled()) {
            throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.", ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED));
        }
    }

    JobGraph createJobGraph(ApplicationDescriptorImpl<? extends ApplicationDescriptor> applicationDescriptorImpl) {
        JobGraph jobGraph = new JobGraph(this.config, applicationDescriptorImpl);
        Set<StreamSpec> streamSpecs = StreamUtil.getStreamSpecs(applicationDescriptorImpl.getInputStreamIds(), this.streamConfig);
        Set<StreamSpec> streamSpecs2 = StreamUtil.getStreamSpecs(applicationDescriptorImpl.getOutputStreamIds(), this.streamConfig);
        Sets.SetView intersection = Sets.intersection(streamSpecs, streamSpecs2);
        Sets.SetView difference = Sets.difference(streamSpecs, intersection);
        Sets.SetView difference2 = Sets.difference(streamSpecs2, intersection);
        Set<TableDescriptor> tableDescriptors = applicationDescriptorImpl.getTableDescriptors();
        MapConfig generateSingleJobConfig = JobPlanner.generateSingleJobConfig(this.config);
        JobNode orCreateJobNode = jobGraph.getOrCreateJobNode(generateSingleJobConfig.get(JobConfig.JOB_NAME), generateSingleJobConfig.get(JobConfig.JOB_ID, "1"));
        difference.forEach(streamSpec -> {
            jobGraph.addInputStream(streamSpec, orCreateJobNode);
        });
        difference2.forEach(streamSpec2 -> {
            jobGraph.addOutputStream(streamSpec2, orCreateJobNode);
        });
        intersection.forEach(streamSpec3 -> {
            jobGraph.addIntermediateStream(streamSpec3, orCreateJobNode, orCreateJobNode);
        });
        Iterator<TableDescriptor> it = tableDescriptors.iterator();
        while (it.hasNext()) {
            LocalTableDescriptor localTableDescriptor = (TableDescriptor) it.next();
            jobGraph.addTable(localTableDescriptor, orCreateJobNode);
            if (localTableDescriptor instanceof LocalTableDescriptor) {
                Iterator it2 = ListUtils.emptyIfNull(localTableDescriptor.getSideInputs()).iterator();
                while (it2.hasNext()) {
                    jobGraph.addSideInputStream(StreamUtil.getStreamSpec((String) it2.next(), this.streamConfig));
                }
            }
        }
        if (!LegacyTaskApplication.class.isAssignableFrom(applicationDescriptorImpl.getAppClass())) {
            jobGraph.validate();
        }
        return jobGraph;
    }

    void setInputAndOutputStreamPartitionCount(JobGraph jobGraph) {
        HashSet<StreamEdge> hashSet = new HashSet();
        hashSet.addAll(jobGraph.getInputStreams());
        hashSet.addAll(jobGraph.getSideInputStreams());
        hashSet.addAll(jobGraph.getOutputStreams());
        HashMultimap create = HashMultimap.create();
        for (StreamEdge streamEdge : hashSet) {
            create.put(streamEdge.getSystemStream().getSystem(), streamEdge);
        }
        for (String str : create.keySet()) {
            Collection<StreamEdge> collection = create.get(str);
            HashMap hashMap = new HashMap();
            for (StreamEdge streamEdge2 : collection) {
                hashMap.put(streamEdge2.getSystemStream().getStream(), streamEdge2);
            }
            for (Map.Entry<String, Integer> entry : this.streamManager.getStreamPartitionCounts(str, hashMap.keySet()).entrySet()) {
                String key = entry.getKey();
                Integer value = entry.getValue();
                ((StreamEdge) hashMap.get(key)).setPartitionCount(value.intValue());
                log.info("Fetched partition count value {} for stream {}", value, key);
            }
        }
    }

    private static List<StreamSet> groupJoinedStreams(JobGraph jobGraph) {
        Multimap<OperatorSpec, InputOperatorSpec> joinToInputOperatorSpecs = OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getApplicationDescriptorImpl().getInputOperators().values());
        Map map = (Map) jobGraph.getTables().stream().collect(Collectors.toMap((v0) -> {
            return v0.getTableId();
        }, Function.identity()));
        ArrayList arrayList = new ArrayList();
        for (OperatorSpec operatorSpec : joinToInputOperatorSpecs.keySet()) {
            StreamSet streamSet = getStreamSet(operatorSpec.getOpId(), joinToInputOperatorSpecs.get(operatorSpec), jobGraph);
            if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
                LocalTableDescriptor localTableDescriptor = (TableDescriptor) map.get(((StreamTableJoinOperatorSpec) operatorSpec).getTableId());
                if (localTableDescriptor instanceof LocalTableDescriptor) {
                    Stream stream = ListUtils.emptyIfNull(localTableDescriptor.getSideInputs()).stream();
                    jobGraph.getClass();
                    Stream map2 = stream.map(jobGraph::getStreamEdge);
                    map2.getClass();
                    Iterable iterable = map2::iterator;
                    streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streamSet.getStreamEdges(), iterable));
                }
            }
            arrayList.add(streamSet);
        }
        return Collections.unmodifiableList(arrayList);
    }

    private static StreamSet getStreamSet(String str, Iterable<InputOperatorSpec> iterable, JobGraph jobGraph) {
        HashSet hashSet = new HashSet();
        Iterator<InputOperatorSpec> it = iterable.iterator();
        while (it.hasNext()) {
            hashSet.add(jobGraph.getStreamEdge(it.next().getStreamId()));
        }
        return new StreamSet(str, hashSet);
    }

    private static void validatePartitions(StreamSet streamSet) {
        Set<StreamEdge> streamEdges = streamSet.getStreamEdges();
        StreamEdge streamEdge = streamEdges.stream().findFirst().get();
        int partitionCount = streamEdge.getPartitionCount();
        Iterator<StreamEdge> it = streamEdges.iterator();
        while (it.hasNext()) {
            int partitionCount2 = it.next().getPartitionCount();
            if (partitionCount2 != partitionCount) {
                throw new SamzaException(String.format("Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d", streamEdge.getName(), streamSet.getSetId(), Integer.valueOf(partitionCount), Integer.valueOf(partitionCount2)));
            }
        }
    }
}
