package com.hazelcast.jet.impl.execution;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.WanBatchPublisherConfig;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.nio.BufferObjectDataOutput;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.core.metrics.MetricNames;
import com.hazelcast.jet.core.metrics.MetricTags;
import com.hazelcast.jet.impl.Networking;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Predicate;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/SenderTasklet.class */
public class SenderTasklet implements Tasklet {
    private static final int BUFFER_SIZE = 32768;
    private final Connection connection;
    private final InboundEdgeStream inboundEdgeStream;
    private final BufferObjectDataOutput outputBuffer;
    private final int bufPosPastHeader;
    private final int packetSizeLimit;
    private final String destinationAddressString;
    private final String sourceOrdinalString;
    private final String sourceVertexName;
    private boolean instreamExhausted;
    private long sentSeq;
    private volatile int sendSeqLimitCompressed;
    private Predicate<Object> addToInboxFunction;
    private final Queue<Object> inbox = new ArrayDeque();
    private final ProgressTracker progTracker = new ProgressTracker();

    @Probe(name = MetricNames.DISTRIBUTED_ITEMS_OUT)
    private final Counter itemsOutCounter = SwCounter.newSwCounter();

    @Probe(name = MetricNames.DISTRIBUTED_BYTES_OUT, unit = ProbeUnit.BYTES)
    private final Counter bytesOutCounter = SwCounter.newSwCounter();

    public SenderTasklet(InboundEdgeStream inboundEdgeStream, NodeEngine nodeEngine, Address address, int i, int i2, long j, String str, int i3, InternalSerializationService internalSerializationService) {
        Queue<Object> queue = this.inbox;
        queue.getClass();
        this.addToInboxFunction = queue::add;
        this.inboundEdgeStream = inboundEdgeStream;
        this.destinationAddressString = address.toString();
        this.sourceVertexName = str;
        this.sourceOrdinalString = WanBatchPublisherConfig.DEFAULT_TARGET_ENDPOINTS + i3;
        this.packetSizeLimit = i2;
        this.connection = ImdgUtil.getMemberConnection(nodeEngine, address);
        this.outputBuffer = internalSerializationService.createObjectDataOutput(32768);
        Util.uncheckRun(() -> {
            this.outputBuffer.write(Networking.createStreamPacketHeader(nodeEngine, j, i, inboundEdgeStream.ordinal()));
        });
        this.bufPosPastHeader = this.outputBuffer.position();
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        tryFillInbox();
        if (this.progTracker.isDone()) {
            return this.progTracker.toProgressState();
        }
        if (tryFillOutputBuffer()) {
            this.progTracker.madeProgress();
            if (!this.connection.write(new Packet(this.outputBuffer.toByteArray()).setPacketType(Packet.Type.JET))) {
                throw new RestartableException("Connection write failed in " + toString());
            }
        }
        return this.progTracker.toProgressState();
    }

    private void tryFillInbox() {
        if (!this.inbox.isEmpty()) {
            this.progTracker.notDone();
            return;
        }
        if (this.instreamExhausted) {
            return;
        }
        this.progTracker.notDone();
        ProgressState drainTo = this.inboundEdgeStream.drainTo(this.addToInboxFunction);
        this.progTracker.madeProgress(drainTo.isMadeProgress());
        this.instreamExhausted = drainTo.isDone();
        if (this.instreamExhausted) {
            this.inbox.add(new ObjectWithPartitionId(DoneItem.DONE_ITEM, -1));
        }
    }

    private boolean tryFillOutputBuffer() {
        Object poll;
        try {
            this.outputBuffer.position(this.bufPosPastHeader + 4);
            int i = 0;
            while (this.outputBuffer.position() < this.packetSizeLimit && isWithinLimit(this.sentSeq, this.sendSeqLimitCompressed) && (poll = this.inbox.poll()) != null) {
                ObjectWithPartitionId objectWithPartitionId = poll instanceof ObjectWithPartitionId ? (ObjectWithPartitionId) poll : new ObjectWithPartitionId(poll, -1);
                int position = this.outputBuffer.position();
                this.outputBuffer.writeObject(objectWithPartitionId.getItem());
                this.sentSeq += ReceiverTasklet.estimatedMemoryFootprint(this.outputBuffer.position() - position);
                this.outputBuffer.writeInt(objectWithPartitionId.getPartitionId());
                i++;
            }
            this.outputBuffer.writeInt(this.bufPosPastHeader, i);
            this.bytesOutCounter.inc(this.outputBuffer.position());
            this.itemsOutCounter.inc(i);
            return i > 0;
        } catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    public void setSendSeqLimitCompressed(int i) {
        this.sendSeqLimitCompressed = i;
    }

    public String toString() {
        return "SenderTasklet{ordinal=" + this.inboundEdgeStream.ordinal() + ", destinationAddress=" + this.destinationAddressString + ", sourceVertexName='" + this.sourceVertexName + "'}";
    }

    static boolean isWithinLimit(long j, int i) {
        return ReceiverTasklet.compressSeq(j) - i <= 0;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet, com.hazelcast.internal.metrics.DynamicMetricsProvider
    public void provideDynamicMetrics(MetricDescriptor metricDescriptor, MetricsCollectionContext metricsCollectionContext) {
        metricsCollectionContext.collect(metricDescriptor.withTag(MetricTags.VERTEX, this.sourceVertexName).withTag(MetricTags.ORDINAL, this.sourceOrdinalString).withTag(MetricTags.DESTINATION_ADDRESS, this.destinationAddressString), this);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 907117697:
                if (implMethodName.equals("lambda$new$10cb850$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/SenderTasklet") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/spi/impl/NodeEngine;JILcom/hazelcast/jet/impl/execution/InboundEdgeStream;)V")) {
                    SenderTasklet senderTasklet = (SenderTasklet) serializedLambda.getCapturedArg(0);
                    NodeEngine nodeEngine = (NodeEngine) serializedLambda.getCapturedArg(1);
                    long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    int intValue = ((Integer) serializedLambda.getCapturedArg(3)).intValue();
                    InboundEdgeStream inboundEdgeStream = (InboundEdgeStream) serializedLambda.getCapturedArg(4);
                    return () -> {
                        this.outputBuffer.write(Networking.createStreamPacketHeader(nodeEngine, longValue, intValue, inboundEdgeStream.ordinal()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
