/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.DoneItem;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.nio.BufferObjectDataInput;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongArray;
import javax.annotation.Nonnull;

public class ReceiverTasklet
implements Tasklet {
    static final int COMPRESSED_SEQ_UNIT_LOG2 = 16;
    static final int INITIAL_RECEIVE_WINDOW_COMPRESSED = 800;
    private final int rwinMultiplier;
    private final double flowControlPeriodNs;
    private final Queue<PacketWithSender> incoming = new MPSCQueue((IdleStrategy)null);
    private final ProgressTracker tracker = new ProgressTracker();
    private final ArrayDeque<ObjPtionAndSenderId> inbox = new ArrayDeque();
    private final OutboundCollector collector;
    private int remainingSenders;
    private final AtomicLongArray ackedSeq;
    private final int[] receiveWindowCompressed;
    private final int[] prevAckedSeqCompressed;
    private final long[] prevTimestamp;

    public ReceiverTasklet(OutboundCollector collector, int rwinMultiplier, int flowControlPeriodMs, int senderCount) {
        this.collector = collector;
        this.rwinMultiplier = rwinMultiplier;
        this.flowControlPeriodNs = TimeUnit.MILLISECONDS.toNanos(flowControlPeriodMs);
        this.remainingSenders = senderCount;
        this.ackedSeq = new AtomicLongArray(senderCount);
        this.receiveWindowCompressed = new int[senderCount];
        Arrays.fill(this.receiveWindowCompressed, 800);
        this.prevAckedSeqCompressed = new int[senderCount];
        this.prevTimestamp = new long[senderCount];
    }

    @Override
    @Nonnull
    public ProgressState call() {
        ObjPtionAndSenderId o;
        this.tracker.reset();
        this.tryFillInbox();
        while ((o = this.inbox.peek()) != null) {
            Object item = o.getItem();
            if (item == DoneItem.DONE_ITEM) {
                --this.remainingSenders;
            } else {
                ProgressState outcome = this.collector.offer(item, o.getPartitionId());
                if (!outcome.isDone()) {
                    this.tracker.madeProgress(outcome.isMadeProgress());
                    break;
                }
            }
            this.tracker.madeProgress();
            this.inbox.remove();
            this.ackItem(o.senderId, o.estimatedMemoryFootprint);
        }
        if (this.remainingSenders == 0) {
            this.tracker.mergeWith(this.collector.close());
        } else {
            this.tracker.notDone();
        }
        return this.tracker.toProgressState();
    }

    public void receiveStreamPacket(BufferObjectDataInput packetInput, int senderId) {
        this.incoming.add(new PacketWithSender(senderId, packetInput));
    }

    public int updateAndGetSendSeqLimitCompressed(int senderId) {
        return this.updateAndGetSendSeqLimitCompressed(senderId, System.nanoTime());
    }

    public int updateAndGetSendSeqLimitCompressed(int senderId, long timestampNow) {
        boolean hadPrevStats = this.prevTimestamp[senderId] != 0L || this.prevAckedSeqCompressed[senderId] != 0;
        long ackTimeDelta = timestampNow - this.prevTimestamp[senderId];
        this.prevTimestamp[senderId] = timestampNow;
        int ackedSeqCompressed = ReceiverTasklet.compressSeq(this.ackedSeq.get(senderId));
        int ackedSeqCompressedDelta = ackedSeqCompressed - this.prevAckedSeqCompressed[senderId];
        this.prevAckedSeqCompressed[senderId] = ackedSeqCompressed;
        if (hadPrevStats) {
            double ackedSeqsPerAckPeriod = this.flowControlPeriodNs * (double)ackedSeqCompressedDelta / (double)ackTimeDelta;
            int targetRwin = this.rwinMultiplier * (int)Math.ceil(ackedSeqsPerAckPeriod);
            int rwinDiff = targetRwin - this.receiveWindowCompressed[senderId];
            int n = senderId;
            this.receiveWindowCompressed[n] = this.receiveWindowCompressed[n] + rwinDiff / 2;
        }
        return ackedSeqCompressed + this.receiveWindowCompressed[senderId];
    }

    long ackItem(int senderId, long itemWeight) {
        long seqNow = this.ackedSeq.get(senderId);
        long seqToBe = seqNow + itemWeight;
        this.ackedSeq.lazySet(senderId, seqToBe);
        return seqToBe;
    }

    static int compressSeq(long seq) {
        return (int)(seq >> 16);
    }

    static long estimatedMemoryFootprint(int itemBlobSize) {
        int inboxSlot = 4;
        int objPtionAndSenderIdHeader = 16;
        int itemField = 4;
        int itemObjHeader = 16;
        int partitionIdField = 4;
        int senderIdField = 4;
        int estimatedMemoryFootprintField = 8;
        int overhead = 56;
        return 56 + itemBlobSize;
    }

    private void tryFillInbox() {
        try {
            PacketWithSender received;
            while ((received = this.incoming.poll()) != null) {
                BufferObjectDataInput in = received.payload;
                int itemCount = in.readInt();
                for (int i = 0; i < itemCount; ++i) {
                    int mark = in.position();
                    Object item = in.readObject();
                    int itemSize = in.position() - mark;
                    this.inbox.add(new ObjPtionAndSenderId(item, in.readInt(), received.senderId, itemSize));
                }
                this.tracker.madeProgress();
            }
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private static class ObjPtionAndSenderId
    extends ObjectWithPartitionId {
        final int senderId;
        final long estimatedMemoryFootprint;

        ObjPtionAndSenderId(Object item, int partitionId, int senderId, int itemBlobSize) {
            super(item, partitionId);
            this.senderId = senderId;
            this.estimatedMemoryFootprint = ReceiverTasklet.estimatedMemoryFootprint(itemBlobSize);
        }
    }

    private static class PacketWithSender {
        final int senderId;
        final BufferObjectDataInput payload;

        PacketWithSender(int senderId, BufferObjectDataInput payload) {
            this.senderId = senderId;
            this.payload = payload;
        }
    }
}

