package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.core.Member;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.OneToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream;
import com.hazelcast.jet.impl.execution.ConveyorCollector;
import com.hazelcast.jet.impl.execution.ConveyorCollectorWithPartition;
import com.hazelcast.jet.impl.execution.ConveyorEmitter;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.InboundEmitter;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.ProcessorTasklet;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.DoneItem;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/init/ExecutionPlan.class */
public class ExecutionPlan implements IdentifiedDataSerializable {
    private static final ILogger LOGGER;
    private final List<Tasklet> tasklets = new ArrayList();
    private final Map<Integer, Map<Integer, ReceiverTasklet>> receiverMap = new HashMap();
    private final Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = new HashMap();
    private List<VertexDef> vertices = new ArrayList();
    private final Map<String, ConcurrentConveyor<Object>[]> localConveyorMap = new HashMap();
    private final Map<String, Map<Address, ConcurrentConveyor<Object>>> edgeSenderConveyorMap = new HashMap();
    private PartitionArrangement ptionArrgmt;
    private NodeEngine nodeEngine;
    private long executionId;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static Map<Member, ExecutionPlan> createExecutionPlans(NodeEngine nodeEngine, DAG dag, int i) {
        JetInstance jetInstance = getJetInstance(nodeEngine);
        ArrayList arrayList = new ArrayList(nodeEngine.getClusterService().getMembers());
        int size = arrayList.size();
        boolean z = size > 1;
        EdgeConfig defaultEdgeConfig = jetInstance.getConfig().getDefaultEdgeConfig();
        Map<Member, ExecutionPlan> map = (Map) arrayList.stream().collect(Collectors.toMap(member -> {
            return member;
        }, member2 -> {
            return new ExecutionPlan();
        }));
        Map<String, Integer> assignVertexIds = assignVertexIds(dag);
        for (Map.Entry<String, Integer> entry : assignVertexIds.entrySet()) {
            Vertex vertex = dag.getVertex(entry.getKey());
            int intValue = entry.getValue().intValue();
            int localParallelism = vertex.getLocalParallelism() != -1 ? vertex.getLocalParallelism() : i;
            int i2 = localParallelism * size;
            List<EdgeDef> edgeDefs = toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, edge -> {
                return (Integer) assignVertexIds.get(edge.getSourceName());
            }, z);
            List<EdgeDef> edgeDefs2 = toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, edge2 -> {
                return (Integer) assignVertexIds.get(edge2.getDestName());
            }, z);
            ProcessorMetaSupplier supplier = vertex.getSupplier();
            supplier.init(new Contexts.MetaSupplierCtx(jetInstance, i2, localParallelism));
            Function<Address, ProcessorSupplier> function = supplier.get((List) map.keySet().stream().map((v0) -> {
                return v0.getAddress();
            }).collect(Collectors.toList()));
            for (Map.Entry<Member, ExecutionPlan> entry2 : map.entrySet()) {
                VertexDef vertexDef = new VertexDef(intValue, vertex.getName(), function.apply(entry2.getKey().getAddress()), localParallelism);
                vertexDef.addInboundEdges(edgeDefs);
                vertexDef.addOutboundEdges(edgeDefs2);
                entry2.getValue().vertices.add(vertexDef);
            }
        }
        return map;
    }

    private static JetInstance getJetInstance(NodeEngine nodeEngine) {
        return ((JetService) nodeEngine.getService(JetService.SERVICE_NAME)).getJetInstance();
    }

    public void initialize(NodeEngine nodeEngine, long j) {
        this.nodeEngine = nodeEngine;
        this.executionId = j;
        initProcSuppliers();
        initDag();
        this.ptionArrgmt = new PartitionArrangement(nodeEngine);
        JetInstance jetInstance = getJetInstance(nodeEngine);
        for (VertexDef vertexDef : this.vertices) {
            int i = -1;
            for (Processor processor : createProcessors(vertexDef, vertexDef.parallelism())) {
                i++;
                List<OutboundEdgeStream> createOutboundEdgeStreams = createOutboundEdgeStreams(vertexDef, i);
                this.tasklets.add(new ProcessorTasklet(vertexDef.name(), new Contexts.ProcCtx(jetInstance, nodeEngine.getLogger(vertexDef.name() + '(' + processor.getClass().getSimpleName() + ")#" + i), vertexDef.name(), i), processor, createInboundEdgeStreams(vertexDef, i), createOutboundEdgeStreams));
            }
        }
        this.tasklets.addAll((Collection) this.receiverMap.values().stream().map((v0) -> {
            return v0.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()));
    }

    public List<ProcessorSupplier> getProcessorSuppliers() {
        return (List) this.vertices.stream().map((v0) -> {
            return v0.processorSupplier();
        }).collect(Collectors.toList());
    }

    public Map<Integer, Map<Integer, ReceiverTasklet>> getReceiverMap() {
        return this.receiverMap;
    }

    public Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> getSenderMap() {
        return this.senderMap;
    }

    public List<Tasklet> getTasklets() {
        return this.tasklets;
    }

    public int getFactoryId() {
        return JetImplDataSerializerHook.FACTORY_ID;
    }

    public int getId() {
        return 0;
    }

    public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
        Util.writeList(objectDataOutput, this.vertices);
    }

    public void readData(ObjectDataInput objectDataInput) throws IOException {
        this.vertices = Util.readList(objectDataInput);
    }

    private static Map<String, Integer> assignVertexIds(DAG dag) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        int[] iArr = {0};
        dag.forEach(vertex -> {
            String name = vertex.getName();
            int i = iArr[0];
            iArr[0] = i + 1;
        });
        return linkedHashMap;
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> list, EdgeConfig edgeConfig, Function<Edge, Integer> function, boolean z) {
        return (List) list.stream().map(edge -> {
            return new EdgeDef(edge, edge.getConfig() == null ? edgeConfig : edge.getConfig(), ((Integer) function.apply(edge)).intValue(), z);
        }).collect(Collectors.toList());
    }

    private void initProcSuppliers() {
        JetService jetService = (JetService) this.nodeEngine.getService(JetService.SERVICE_NAME);
        this.vertices.forEach(vertexDef -> {
            vertexDef.processorSupplier().init(new Contexts.ProcSupplierCtx(jetService.getJetInstance(), vertexDef.parallelism()));
        });
    }

    private void initDag() {
        Map map = (Map) this.vertices.stream().collect(Collectors.toMap((v0) -> {
            return v0.vertexId();
        }, vertexDef -> {
            return vertexDef;
        }));
        this.vertices.forEach(vertexDef2 -> {
            vertexDef2.inboundEdges().forEach(edgeDef -> {
                edgeDef.initTransientFields(map, vertexDef2, false);
            });
            vertexDef2.outboundEdges().forEach(edgeDef2 -> {
                edgeDef2.initTransientFields(map, vertexDef2, true);
            });
        });
        IPartitionService partitionService = this.nodeEngine.getPartitionService();
        this.vertices.stream().map((v0) -> {
            return v0.outboundEdges();
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.partitioner();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(partitioner -> {
            partitionService.getClass();
            partitioner.init(partitionService::getPartitionId);
        });
    }

    private static Collection<? extends Processor> createProcessors(VertexDef vertexDef, int i) {
        Collection<? extends Processor> collection = vertexDef.processorSupplier().get(i);
        if (collection.size() != i) {
            throw new JetException("ProcessorSupplier failed to return the requested number of processors. Requested: " + i + ", returned: " + collection.size());
        }
        return collection;
    }

    private List<OutboundEdgeStream> createOutboundEdgeStreams(VertexDef vertexDef, int i) {
        ArrayList arrayList = new ArrayList();
        for (EdgeDef edgeDef : vertexDef.outboundEdges()) {
            this.localConveyorMap.computeIfAbsent(edgeDef.edgeId(), str -> {
                return createConveyorArray(edgeDef.destVertex().parallelism(), vertexDef.parallelism() + (edgeDef.isDistributed() ? 1 : 0), edgeDef.getConfig().getQueueSize());
            });
            arrayList.add(createOutboundEdgeStream(edgeDef, i, edgeDef.isDistributed() ? memberToSenderConveyorMap(this.edgeSenderConveyorMap, edgeDef) : null));
        }
        return arrayList;
    }

    private Map<Address, ConcurrentConveyor<Object>> memberToSenderConveyorMap(Map<String, Map<Address, ConcurrentConveyor<Object>>> map, EdgeDef edgeDef) {
        if ($assertionsDisabled || edgeDef.isDistributed()) {
            return map.computeIfAbsent(edgeDef.edgeId(), str -> {
                HashMap hashMap = new HashMap();
                for (Address address : Util.getRemoteMembers(this.nodeEngine)) {
                    ConcurrentConveyor<Object> concurrentConveyor = createConveyorArray(1, edgeDef.sourceVertex().parallelism(), edgeDef.getConfig().getQueueSize())[0];
                    ConcurrentInboundEdgeStream createInboundEdgeStream = createInboundEdgeStream(edgeDef.destOrdinal(), edgeDef.priority(), concurrentConveyor);
                    int vertexId = edgeDef.destVertex().vertexId();
                    SenderTasklet senderTasklet = new SenderTasklet(createInboundEdgeStream, this.nodeEngine, address, this.executionId, vertexId, edgeDef.getConfig().getPacketSizeLimit());
                    this.senderMap.computeIfAbsent(Integer.valueOf(vertexId), num -> {
                        return new HashMap();
                    }).computeIfAbsent(Integer.valueOf(edgeDef.destOrdinal()), num2 -> {
                        return new HashMap();
                    }).put(address, senderTasklet);
                    this.tasklets.add(senderTasklet);
                    hashMap.put(address, concurrentConveyor);
                }
                return hashMap;
            });
        }
        throw new AssertionError("Edge is not distributed");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ConcurrentConveyor<Object>[] createConveyorArray(int i, int i2, int i3) {
        ConcurrentConveyor<Object>[] concurrentConveyorArr = new ConcurrentConveyor[i];
        Arrays.setAll(concurrentConveyorArr, i4 -> {
            QueuedPipe[] queuedPipeArr = new QueuedPipe[i2];
            Arrays.setAll(queuedPipeArr, i4 -> {
                return new OneToOneConcurrentArrayQueue(i3);
            });
            return ConcurrentConveyor.concurrentConveyor(DoneItem.DONE_ITEM, queuedPipeArr);
        });
        return concurrentConveyorArr;
    }

    private OutboundEdgeStream createOutboundEdgeStream(EdgeDef edgeDef, int i, Map<Address, ConcurrentConveyor<Object>> map) {
        OutboundCollector[] outboundCollectorArr;
        int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        int parallelism = edgeDef.destVertex().parallelism();
        int[][] assignPartitionsToProcessors = this.ptionArrgmt.assignPartitionsToProcessors(parallelism, edgeDef.isDistributed());
        OutboundCollector[] outboundCollectorArr2 = new OutboundCollector[parallelism];
        ConcurrentConveyor<Object>[] concurrentConveyorArr = this.localConveyorMap.get(edgeDef.edgeId());
        Arrays.setAll(outboundCollectorArr2, i2 -> {
            return new ConveyorCollector(concurrentConveyorArr[i2], i, assignPartitionsToProcessors[i2]);
        });
        if (edgeDef.isDistributed()) {
            createIfAbsentReceiverTasklet(edgeDef, assignPartitionsToProcessors, partitionCount);
            Map<Address, int[]> map2 = this.ptionArrgmt.remotePartitionAssignment.get();
            outboundCollectorArr = new OutboundCollector[map2.size() + 1];
            outboundCollectorArr[0] = OutboundCollector.compositeCollector(outboundCollectorArr2, edgeDef, partitionCount);
            int i3 = 1;
            for (Map.Entry<Address, int[]> entry : map2.entrySet()) {
                int i4 = i3;
                i3++;
                outboundCollectorArr[i4] = new ConveyorCollectorWithPartition(map.get(entry.getKey()), i, entry.getValue());
            }
        } else {
            outboundCollectorArr = outboundCollectorArr2;
        }
        return new OutboundEdgeStream(edgeDef.sourceOrdinal(), edgeDef.isBuffered() ? Integer.MAX_VALUE : edgeDef.getConfig().getHighWaterMark(), OutboundCollector.compositeCollector(outboundCollectorArr, edgeDef, partitionCount));
    }

    private void createIfAbsentReceiverTasklet(EdgeDef edgeDef, int[][] iArr, int i) {
        ConcurrentConveyor<Object>[] concurrentConveyorArr = this.localConveyorMap.get(edgeDef.edgeId());
        this.receiverMap.computeIfAbsent(Integer.valueOf(edgeDef.destVertex().vertexId()), num -> {
            return new HashMap();
        }).computeIfAbsent(Integer.valueOf(edgeDef.destOrdinal()), num2 -> {
            OutboundCollector[] outboundCollectorArr = new OutboundCollector[iArr.length];
            Arrays.setAll(outboundCollectorArr, i2 -> {
                return new ConveyorCollector(concurrentConveyorArr[i2], concurrentConveyorArr[i2].queueCount() - 1, iArr[i2]);
            });
            return new ReceiverTasklet(OutboundCollector.compositeCollector(outboundCollectorArr, edgeDef, i), edgeDef.getConfig().getReceiveWindowMultiplier(), getConfig().getInstanceConfig().getFlowControlPeriodMs(), this.nodeEngine.getClusterService().getSize() - 1);
        });
    }

    private JetConfig getConfig() {
        return ((JetService) this.nodeEngine.getService(JetService.SERVICE_NAME)).getJetInstance().getConfig();
    }

    private List<InboundEdgeStream> createInboundEdgeStreams(VertexDef vertexDef, int i) {
        ArrayList arrayList = new ArrayList();
        for (EdgeDef edgeDef : vertexDef.inboundEdges()) {
            arrayList.add(createInboundEdgeStream(edgeDef.destOrdinal(), edgeDef.priority(), this.localConveyorMap.get(edgeDef.edgeId())[i]));
        }
        return arrayList;
    }

    private static ConcurrentInboundEdgeStream createInboundEdgeStream(int i, int i2, ConcurrentConveyor<Object> concurrentConveyor) {
        InboundEmitter[] inboundEmitterArr = new InboundEmitter[concurrentConveyor.queueCount()];
        Arrays.setAll(inboundEmitterArr, i3 -> {
            return new ConveyorEmitter(concurrentConveyor, i3);
        });
        return new ConcurrentInboundEdgeStream(inboundEmitterArr, i, i2);
    }

    static {
        $assertionsDisabled = !ExecutionPlan.class.desiredAssertionStatus();
        LOGGER = Logger.getLogger(ExecutionPlan.class);
    }
}
