package com.hazelcast.jet.impl;

import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.BufferObjectDataInput;
import com.hazelcast.nio.BufferObjectDataOutput;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/hazelcast/jet/impl/Networking.class */
public class Networking {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final ScheduledFuture<?> flowControlSender;
    private final ConcurrentHashMap<Long, ExecutionContext> executionContexts;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Networking(NodeEngine nodeEngine, ConcurrentHashMap<Long, ExecutionContext> concurrentHashMap, int i) {
        this.nodeEngine = (NodeEngineImpl) nodeEngine;
        this.executionContexts = concurrentHashMap;
        this.logger = nodeEngine.getLogger(getClass());
        this.flowControlSender = nodeEngine.getExecutionService().scheduleWithRepetition(this::broadcastFlowControlPacket, 0L, i, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void destroy() {
        this.flowControlSender.cancel(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handle(Packet packet) throws IOException {
        if (packet.isFlagRaised(2)) {
            handleFlowControlPacket(packet.getConn().getEndPoint(), packet.toByteArray());
        } else {
            handleStreamPacket(packet);
        }
    }

    private void handleStreamPacket(Packet packet) throws IOException {
        BufferObjectDataInput createObjectDataInput = Util.createObjectDataInput(this.nodeEngine, packet.toByteArray());
        long readLong = createObjectDataInput.readLong();
        this.executionContexts.get(Long.valueOf(readLong)).handlePacket(createObjectDataInput.readInt(), createObjectDataInput.readInt(), packet.getConn().getEndPoint(), createObjectDataInput);
    }

    public static byte[] createStreamPacketHeader(NodeEngine nodeEngine, long j, int i, int i2) {
        BufferObjectDataOutput createObjectDataOutput = Util.createObjectDataOutput(nodeEngine);
        try {
            createObjectDataOutput.writeLong(j);
            createObjectDataOutput.writeInt(i);
            createObjectDataOutput.writeInt(i2);
            return createObjectDataOutput.toByteArray();
        } catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private void broadcastFlowControlPacket() {
        try {
            Util.getRemoteMembers(this.nodeEngine).forEach(address -> {
                Util.uncheckRun(() -> {
                    byte[] createFlowControlPacket = createFlowControlPacket(address);
                    if (createFlowControlPacket.length == 0) {
                        return;
                    }
                    Util.getMemberConnection(this.nodeEngine, address).write(new Packet(createFlowControlPacket).setPacketType(Packet.Type.JET).raiseFlags(18));
                });
            });
        } catch (Throwable th) {
            this.logger.severe("Flow-control packet broadcast failed", th);
        }
    }

    private byte[] createFlowControlPacket(Address address) throws IOException {
        BufferObjectDataOutput createObjectDataOutput = Util.createObjectDataOutput(this.nodeEngine);
        boolean[] zArr = {false};
        createObjectDataOutput.writeInt(this.executionContexts.size());
        this.executionContexts.forEach((l, executionContext) -> {
            Util.uncheckRun(() -> {
                if (executionContext.isParticipating(address)) {
                    createObjectDataOutput.writeLong(l.longValue());
                    createObjectDataOutput.writeInt(executionContext.receiverMap().values().stream().mapToInt((v0) -> {
                        return v0.size();
                    }).sum());
                    executionContext.receiverMap().forEach((num, map) -> {
                        map.forEach((num, map) -> {
                            Util.uncheckRun(() -> {
                                createObjectDataOutput.writeInt(num.intValue());
                                createObjectDataOutput.writeInt(num.intValue());
                                createObjectDataOutput.writeInt(((ReceiverTasklet) map.get(address)).updateAndGetSendSeqLimitCompressed());
                                zArr[0] = true;
                            });
                        });
                    });
                }
            });
        });
        return zArr[0] ? createObjectDataOutput.toByteArray() : EMPTY_BYTES;
    }

    private void handleFlowControlPacket(Address address, byte[] bArr) throws IOException {
        BufferObjectDataInput createObjectDataInput = Util.createObjectDataInput(this.nodeEngine, bArr);
        int readInt = createObjectDataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            long readLong = createObjectDataInput.readLong();
            Map map = (Map) Optional.ofNullable(this.executionContexts).map(concurrentHashMap -> {
                return (ExecutionContext) concurrentHashMap.get(Long.valueOf(readLong));
            }).map((v0) -> {
                return v0.senderMap();
            }).orElse(null);
            if (map == null) {
                logMissingExeCtx(readLong);
            } else {
                int readInt2 = createObjectDataInput.readInt();
                for (int i2 = 0; i2 < readInt2; i2++) {
                    int readInt3 = createObjectDataInput.readInt();
                    int readInt4 = createObjectDataInput.readInt();
                    int readInt5 = createObjectDataInput.readInt();
                    SenderTasklet senderTasklet = (SenderTasklet) Optional.ofNullable(map.get(Integer.valueOf(readInt3))).map(map2 -> {
                        return (Map) map2.get(Integer.valueOf(readInt4));
                    }).map(map3 -> {
                        return (SenderTasklet) map3.get(address);
                    }).orElse(null);
                    if (senderTasklet == null) {
                        logMissingSenderTasklet(readInt3, readInt4);
                        return;
                    }
                    senderTasklet.setSendSeqLimitCompressed(readInt5);
                }
            }
        }
    }

    private void logMissingExeCtx(long j) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Ignoring flow control message applying to non-existent execution context " + j);
        }
    }

    private void logMissingSenderTasklet(int i, int i2) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("Ignoring flow control message applying to non-existent sender tasklet (%d, %d)", Integer.valueOf(i), Integer.valueOf(i2)));
        }
    }
}
