/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.EdgeDef;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.VertexDef;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class ExecutionPlanBuilder {
    private ExecutionPlanBuilder() {
    }

    public static Map<MemberInfo, ExecutionPlan> createExecutionPlans(NodeEngine nodeEngine, MembersView membersView, DAG dag, long jobId, long executionId, JobConfig jobConfig, long lastSnapshotId) {
        JetInstance instance = Util.getJetInstance(nodeEngine);
        int defaultParallelism = instance.getConfig().getInstanceConfig().getCooperativeThreadCount();
        HashSet<MemberInfo> members = new HashSet<MemberInfo>(membersView.size());
        Address[] partitionOwners = new Address[nodeEngine.getPartitionService().getPartitionCount()];
        ExecutionPlanBuilder.initPartitionOwnersAndMembers(nodeEngine, membersView, members, partitionOwners);
        List<Address> addresses = Util.toList(members, MemberInfo::getAddress);
        int clusterSize = members.size();
        boolean isJobDistributed = clusterSize > 1;
        EdgeConfig defaultEdgeConfig = instance.getConfig().getDefaultEdgeConfig();
        HashMap<MemberInfo, ExecutionPlan> plans = new HashMap<MemberInfo, ExecutionPlan>();
        int memberIndex = 0;
        for (MemberInfo member : members) {
            plans.put(member, new ExecutionPlan(partitionOwners, jobConfig, lastSnapshotId, memberIndex++, clusterSize));
        }
        Map<String, Integer> vertexIdMap = ExecutionPlanBuilder.assignVertexIds(dag);
        for (Map.Entry<String, Integer> entry : vertexIdMap.entrySet()) {
            Vertex vertex = dag.getVertex(entry.getKey());
            ProcessorMetaSupplier metaSupplier = vertex.getMetaSupplier();
            int vertexId = entry.getValue();
            int localParallelism = vertex.determineLocalParallelism(defaultParallelism);
            int totalParallelism = localParallelism * clusterSize;
            List<EdgeDef> inbound = ExecutionPlanBuilder.toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getSourceName()), isJobDistributed);
            List<EdgeDef> outbound = ExecutionPlanBuilder.toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getDestName()), isJobDistributed);
            ILogger logger = nodeEngine.getLogger(String.format("%s.%s#ProcessorMetaSupplier", metaSupplier.getClass().getName(), vertex.getName()));
            try {
                metaSupplier.init(new Contexts.MetaSupplierCtx(instance, jobId, executionId, jobConfig, logger, vertex.getName(), localParallelism, totalParallelism, clusterSize, jobConfig.getProcessingGuarantee()));
            }
            catch (Exception e2) {
                throw ExceptionUtil.sneakyThrow(e2);
            }
            Function<? super Address, ? extends ProcessorSupplier> procSupplierFn = metaSupplier.get(addresses);
            for (Map.Entry e3 : plans.entrySet()) {
                ProcessorSupplier processorSupplier = procSupplierFn.apply(((MemberInfo)e3.getKey()).getAddress());
                Util.checkSerializable(processorSupplier, "ProcessorSupplier in vertex '" + vertex.getName() + '\'');
                VertexDef vertexDef = new VertexDef(vertexId, vertex.getName(), processorSupplier, localParallelism);
                vertexDef.addInboundEdges(inbound);
                vertexDef.addOutboundEdges(outbound);
                ((ExecutionPlan)e3.getValue()).addVertex(vertexDef);
            }
        }
        return plans;
    }

    private static Map<String, Integer> assignVertexIds(DAG dag) {
        LinkedHashMap<String, Integer> vertexIdMap = new LinkedHashMap<String, Integer>();
        int[] vertexId = new int[]{0};
        dag.forEach(v -> {
            int n = vertexId[0];
            vertexId[0] = n + 1;
            vertexIdMap.put(v.getName(), n);
        });
        return vertexIdMap;
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> edges, EdgeConfig defaultEdgeConfig, Function<Edge, Integer> oppositeVtxId, boolean isJobDistributed) {
        return edges.stream().map(edge -> new EdgeDef((Edge)edge, edge.getConfig() == null ? defaultEdgeConfig : edge.getConfig(), (Integer)oppositeVtxId.apply((Edge)edge), isJobDistributed)).collect(Collectors.toList());
    }

    private static void initPartitionOwnersAndMembers(NodeEngine nodeEngine, MembersView membersView, Collection<MemberInfo> members, Address[] partitionOwners) {
        IPartitionService partitionService = nodeEngine.getPartitionService();
        for (int partitionId = 0; partitionId < partitionOwners.length; ++partitionId) {
            Address address = partitionService.getPartitionOwnerOrWait(partitionId);
            MemberInfo member = membersView.getMember(address);
            if (member == null) {
                throw new TopologyChangedException("Topology changed, " + address + " is not in original member list");
            }
            members.add(member);
            partitionOwners[partitionId] = address;
        }
    }
}

