package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/init/ExecutionPlanBuilder.class */
public final class ExecutionPlanBuilder {
    private ExecutionPlanBuilder() {
    }

    public static Map<MemberInfo, ExecutionPlan> createExecutionPlans(NodeEngine nodeEngine, MembersView membersView, DAG dag, JobConfig jobConfig, long j) {
        JetInstance jetInstance = Util.getJetInstance(nodeEngine);
        int cooperativeThreadCount = jetInstance.getConfig().getInstanceConfig().getCooperativeThreadCount();
        HashSet hashSet = new HashSet(membersView.size());
        Address[] addressArr = new Address[nodeEngine.getPartitionService().getPartitionCount()];
        initPartitionOwnersAndMembers(nodeEngine, membersView, hashSet, addressArr);
        List<Address> list = (List) hashSet.stream().map((v0) -> {
            return v0.getAddress();
        }).collect(Collectors.toList());
        int size = hashSet.size();
        boolean z = size > 1;
        EdgeConfig defaultEdgeConfig = jetInstance.getConfig().getDefaultEdgeConfig();
        HashMap hashMap = new HashMap();
        int i = 0;
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            hashMap.put((MemberInfo) it.next(), new ExecutionPlan(addressArr, jobConfig, j, i2, size));
        }
        Map<String, Integer> assignVertexIds = assignVertexIds(dag);
        for (Map.Entry<String, Integer> entry : assignVertexIds.entrySet()) {
            Vertex vertex = dag.getVertex(entry.getKey());
            ProcessorMetaSupplier metaSupplier = vertex.getMetaSupplier();
            int intValue = entry.getValue().intValue();
            int determineParallelism = determineParallelism(vertex, metaSupplier.preferredLocalParallelism(), cooperativeThreadCount);
            int i3 = determineParallelism * size;
            List<EdgeDef> edgeDefs = toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, edge -> {
                return (Integer) assignVertexIds.get(edge.getSourceName());
            }, z);
            List<EdgeDef> edgeDefs2 = toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, edge2 -> {
                return (Integer) assignVertexIds.get(edge2.getDestName());
            }, z);
            metaSupplier.init(new Contexts.MetaSupplierCtx(jetInstance, nodeEngine.getLogger(String.format("%s.%s#ProcessorMetaSupplier", metaSupplier.getClass().getName(), vertex.getName())), vertex.getName(), determineParallelism, i3, size));
            Function<Address, ProcessorSupplier> function = metaSupplier.get(list);
            for (Map.Entry entry2 : hashMap.entrySet()) {
                ProcessorSupplier apply = function.apply(((MemberInfo) entry2.getKey()).getAddress());
                Util.checkSerializable(apply, "ProcessorSupplier in vertex '" + vertex.getName() + '\'');
                VertexDef vertexDef = new VertexDef(intValue, vertex.getName(), apply, determineParallelism);
                vertexDef.addInboundEdges(edgeDefs);
                vertexDef.addOutboundEdges(edgeDefs2);
                ((ExecutionPlan) entry2.getValue()).addVertex(vertexDef);
            }
        }
        return hashMap;
    }

    private static Map<String, Integer> assignVertexIds(DAG dag) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        int[] iArr = {0};
        dag.forEach(vertex -> {
            String name = vertex.getName();
            int i = iArr[0];
            iArr[0] = i + 1;
        });
        return linkedHashMap;
    }

    private static int determineParallelism(Vertex vertex, int i, int i2) {
        int localParallelism = vertex.getLocalParallelism();
        Vertex.checkLocalParallelism(i);
        Vertex.checkLocalParallelism(localParallelism);
        return localParallelism != -1 ? localParallelism : i != -1 ? Integer.min(i, i2) : i2;
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> list, EdgeConfig edgeConfig, Function<Edge, Integer> function, boolean z) {
        return (List) list.stream().map(edge -> {
            return new EdgeDef(edge, edge.getConfig() == null ? edgeConfig : edge.getConfig(), ((Integer) function.apply(edge)).intValue(), z);
        }).collect(Collectors.toList());
    }

    private static void initPartitionOwnersAndMembers(NodeEngine nodeEngine, MembersView membersView, Collection<MemberInfo> collection, Address[] addressArr) {
        IPartitionService partitionService = nodeEngine.getPartitionService();
        for (int i = 0; i < addressArr.length; i++) {
            Address partitionOwnerOrWait = partitionService.getPartitionOwnerOrWait(i);
            MemberInfo member = membersView.getMember(partitionOwnerOrWait);
            if (member == null) {
                throw new TopologyChangedException("Topology changed, " + partitionOwnerOrWait + " is not in original member list");
            }
            collection.add(member);
            addressArr[i] = partitionOwnerOrWait;
        }
    }
}
