package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/OutboxImpl.class */
public class OutboxImpl implements OutboxInternal {
    private final OutboundCollector[] outstreams;
    private final ProgressTracker progTracker;
    private final SerializationService serializationService;
    private final int batchSize;
    private final AtomicLongArray counters;
    private final int[] allEdges;
    private final int[] allEdgesAndSnapshot;
    private final int[] snapshotEdge;
    private final BitSet broadcastTracker;
    private Map.Entry<Data, Data> pendingSnapshotEntry;
    private int numRemainingInBatch;
    private Object unfinishedItem;
    private int[] unfinishedItemOrdinals;
    private Object unfinishedSnapshotKey;
    private Object unfinishedSnapshotValue;
    private boolean blocked;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final int[] singleEdge = {0};
    private final AtomicLong lastForwardedWm = new AtomicLong(Long.MIN_VALUE);

    public OutboxImpl(OutboundCollector[] outboundCollectorArr, boolean z, ProgressTracker progressTracker, SerializationService serializationService, int i, AtomicLongArray atomicLongArray) {
        this.outstreams = outboundCollectorArr;
        this.progTracker = progressTracker;
        this.serializationService = serializationService;
        this.batchSize = i;
        this.counters = atomicLongArray;
        Preconditions.checkPositive(i, "batchSize must be positive");
        this.allEdges = IntStream.range(0, outboundCollectorArr.length - (z ? 1 : 0)).toArray();
        this.allEdgesAndSnapshot = IntStream.range(0, outboundCollectorArr.length).toArray();
        this.snapshotEdge = z ? new int[]{outboundCollectorArr.length - 1} : null;
        this.broadcastTracker = new BitSet(outboundCollectorArr.length);
    }

    @Override // com.hazelcast.jet.core.Outbox
    public final int bucketCount() {
        return this.allEdges.length;
    }

    @Override // com.hazelcast.jet.core.Outbox
    public final boolean offer(int i, @Nonnull Object obj) {
        if (i == -1) {
            return offerInternal(this.allEdges, obj);
        }
        if (i == bucketCount()) {
            throw new IllegalArgumentException("Illegal edge ordinal: " + i);
        }
        this.singleEdge[0] = i;
        return offerInternal(this.singleEdge, obj);
    }

    @Override // com.hazelcast.jet.core.Outbox
    public final boolean offer(@Nonnull int[] iArr, @Nonnull Object obj) {
        if ($assertionsDisabled || this.snapshotEdge == null || Util.arrayIndexOf(this.snapshotEdge[0], iArr) < 0) {
            return offerInternal(iArr, obj);
        }
        throw new AssertionError("Ordinal " + this.snapshotEdge[0] + " is out of range");
    }

    private boolean offerInternal(@Nonnull int[] iArr, @Nonnull Object obj) {
        if (shouldBlock()) {
            return false;
        }
        if (!$assertionsDisabled && this.unfinishedItem != null && !obj.equals(this.unfinishedItem)) {
            throw new AssertionError("Different item offered after previous call returned false: expected=" + this.unfinishedItem + ", got=" + obj);
        }
        if (!$assertionsDisabled && this.unfinishedItemOrdinals != null && !Arrays.equals(this.unfinishedItemOrdinals, iArr)) {
            throw new AssertionError("Offered to different ordinals after previous call returned false: expected=" + Arrays.toString(this.unfinishedItemOrdinals) + ", got=" + Arrays.toString(iArr));
        }
        if (!$assertionsDisabled && this.numRemainingInBatch == -1) {
            throw new AssertionError("Outbox.offer() called again after it returned false, without a call to reset(). You probably didn't return from Processor method after Outbox.offer() or AbstractProcessor.tryEmit() returned false");
        }
        this.numRemainingInBatch--;
        boolean z = true;
        if (this.numRemainingInBatch == -1) {
            z = false;
        } else {
            if (iArr.length == 0) {
                this.progTracker.madeProgress();
            }
            for (int i = 0; i < iArr.length; i++) {
                if (!this.broadcastTracker.get(i)) {
                    ProgressState doOffer = doOffer(this.outstreams[iArr[i]], obj);
                    if (doOffer.isMadeProgress()) {
                        this.progTracker.madeProgress();
                    }
                    if (doOffer.isDone()) {
                        this.broadcastTracker.set(i);
                        if (!(obj instanceof BroadcastItem)) {
                            Util.lazyIncrement(this.counters, iArr[i]);
                        }
                    } else {
                        z = false;
                    }
                }
            }
        }
        if (z) {
            this.broadcastTracker.clear();
            this.unfinishedItem = null;
            this.unfinishedItemOrdinals = null;
            if (obj instanceof Watermark) {
                long timestamp = ((Watermark) obj).timestamp();
                if (timestamp != WatermarkCoalescer.IDLE_MESSAGE.timestamp()) {
                    if (!$assertionsDisabled && this.lastForwardedWm.get() > timestamp) {
                        throw new AssertionError("current=" + this.lastForwardedWm.get() + ", new=" + timestamp);
                    }
                    this.lastForwardedWm.lazySet(timestamp);
                }
            }
        } else {
            this.numRemainingInBatch = -1;
            this.unfinishedItem = obj;
            if (!$assertionsDisabled) {
                int[] copyOf = Arrays.copyOf(iArr, iArr.length);
                this.unfinishedItemOrdinals = copyOf;
                if (copyOf == null) {
                    throw new AssertionError();
                }
            }
        }
        return z;
    }

    @Override // com.hazelcast.jet.core.Outbox
    public final boolean offer(@Nonnull Object obj) {
        return offerInternal(this.allEdges, obj);
    }

    @Override // com.hazelcast.jet.core.Outbox
    public final boolean offerToSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (this.snapshotEdge == null) {
            throw new IllegalStateException("Outbox does not have snapshot queue");
        }
        if (shouldBlock()) {
            return false;
        }
        if (!$assertionsDisabled && this.unfinishedSnapshotKey != null && !this.unfinishedSnapshotKey.equals(obj)) {
            throw new AssertionError("Different key offered after previous call returned false: expected=" + this.unfinishedSnapshotKey + ", got=" + obj);
        }
        if (!$assertionsDisabled && this.unfinishedSnapshotValue != null && !this.unfinishedSnapshotValue.equals(obj2)) {
            throw new AssertionError("Different value offered after previous call returned false: expected=" + this.unfinishedSnapshotValue + ", got=" + obj2);
        }
        if (this.pendingSnapshotEntry == null) {
            this.pendingSnapshotEntry = com.hazelcast.jet.Util.entry(this.serializationService.toData(obj), this.serializationService.toData(obj2));
        }
        boolean offerInternal = offerInternal(this.snapshotEdge, this.pendingSnapshotEntry);
        if (offerInternal) {
            this.pendingSnapshotEntry = null;
            this.unfinishedSnapshotKey = null;
            this.unfinishedSnapshotValue = null;
        } else {
            this.unfinishedSnapshotKey = obj;
            this.unfinishedSnapshotValue = obj2;
        }
        return offerInternal;
    }

    @Override // com.hazelcast.jet.core.Outbox
    public boolean hasUnfinishedItem() {
        return (this.unfinishedItem == null && this.unfinishedSnapshotKey == null) ? false : true;
    }

    @Override // com.hazelcast.jet.impl.execution.OutboxInternal
    public void block() {
        this.blocked = true;
    }

    @Override // com.hazelcast.jet.impl.execution.OutboxInternal
    public void unblock() {
        this.blocked = false;
    }

    private boolean shouldBlock() {
        return this.blocked && !hasUnfinishedItem();
    }

    @Override // com.hazelcast.jet.impl.execution.OutboxInternal
    public void reset() {
        this.numRemainingInBatch = this.batchSize;
    }

    private ProgressState doOffer(OutboundCollector outboundCollector, Object obj) {
        return obj instanceof BroadcastItem ? outboundCollector.offerBroadcast((BroadcastItem) obj) : outboundCollector.offer(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean offerToEdgesAndSnapshot(Object obj) {
        return offerInternal(this.allEdgesAndSnapshot, obj);
    }

    @Override // com.hazelcast.jet.impl.execution.OutboxInternal
    public long lastForwardedWm() {
        return this.lastForwardedWm.get();
    }

    static {
        $assertionsDisabled = !OutboxImpl.class.desiredAssertionStatus();
    }
}
