package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.util.function.Predicate;
import java.util.BitSet;
import java.util.function.Consumer;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream.class */
public class ConcurrentInboundEdgeStream implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final boolean waitForSnapshot;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ItemDetector itemDetector = new ItemDetector();
    private final WatermarkCoalescer watermarkCoalescer;
    private final BitSet receivedBarriers;
    private final ILogger logger;
    private long pendingSnapshotId;
    private long numActiveQueues;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$ItemDetector.class */
    public static final class ItemDetector implements Predicate<Object> {
        Consumer<Object> dest;
        BroadcastItem item;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ItemDetector() {
        }

        void reset(Consumer<Object> consumer) {
            this.dest = consumer;
            this.item = null;
        }

        @Override // com.hazelcast.util.function.Predicate
        public boolean test(Object obj) {
            if (!(obj instanceof Watermark) && !(obj instanceof SnapshotBarrier) && obj != DoneItem.DONE_ITEM) {
                this.dest.accept(obj);
                return true;
            }
            if (!$assertionsDisabled && this.item != null) {
                throw new AssertionError("Received multiple special items without a call to reset(): " + this.item);
            }
            this.item = (BroadcastItem) obj;
            return false;
        }

        static {
            $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
        }
    }

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, long j, boolean z, int i3, String str) {
        this.conveyor = concurrentConveyor;
        this.ordinal = i;
        this.priority = i2;
        this.waitForSnapshot = z;
        this.watermarkCoalescer = WatermarkCoalescer.create(i3, concurrentConveyor.queueCount());
        this.numActiveQueues = concurrentConveyor.queueCount();
        this.receivedBarriers = new BitSet(concurrentConveyor.queueCount());
        this.pendingSnapshotId = j + 1;
        this.logger = Logger.getLogger(ConcurrentInboundEdgeStream.class.getName() + "." + str);
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int ordinal() {
        return this.ordinal;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int priority() {
        return this.priority;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public ProgressState drainTo(Consumer<Object> consumer) {
        return drainTo(this.watermarkCoalescer.getTime(), consumer);
    }

    ProgressState drainTo(long j, Consumer<Object> consumer) {
        this.tracker.reset();
        for (int i = 0; i < this.conveyor.queueCount(); i++) {
            QueuedPipe<Object> queue = this.conveyor.queue(i);
            if (queue != null && (!this.waitForSnapshot || !this.receivedBarriers.get(i))) {
                ProgressState drainQueue = drainQueue(queue, consumer);
                this.tracker.mergeWith(drainQueue);
                if (this.itemDetector.item == DoneItem.DONE_ITEM) {
                    this.conveyor.removeQueue(i);
                    this.receivedBarriers.clear(i);
                    this.numActiveQueues--;
                    if (maybeEmitWm(this.watermarkCoalescer.queueDone(i), consumer)) {
                        return this.numActiveQueues == 0 ? ProgressState.DONE : ProgressState.MADE_PROGRESS;
                    }
                } else if (this.itemDetector.item instanceof Watermark) {
                    boolean maybeEmitWm = maybeEmitWm(this.watermarkCoalescer.observeWm(j, i, ((Watermark) this.itemDetector.item).timestamp()), consumer);
                    if (this.logger.isFinestEnabled()) {
                        this.logger.finest("Received " + this.itemDetector.item + " from queue " + i + '/' + this.conveyor.queueCount() + (maybeEmitWm ? ", forwarded" : ", not forwarded"));
                    }
                    if (maybeEmitWm) {
                        return ProgressState.MADE_PROGRESS;
                    }
                } else if (this.itemDetector.item instanceof SnapshotBarrier) {
                    observeBarrier(i, ((SnapshotBarrier) this.itemDetector.item).snapshotId());
                } else if (drainQueue.isMadeProgress()) {
                    this.watermarkCoalescer.observeEvent(i);
                }
                if (this.numActiveQueues == 0) {
                    return this.tracker.toProgressState();
                }
                if (this.itemDetector.item != null && this.receivedBarriers.cardinality() == this.numActiveQueues) {
                    consumer.accept(new SnapshotBarrier(this.pendingSnapshotId));
                    this.pendingSnapshotId++;
                    this.receivedBarriers.clear();
                    return ProgressState.MADE_PROGRESS;
                }
            }
        }
        if (maybeEmitWm(this.watermarkCoalescer.checkWmHistory(j), consumer)) {
            return ProgressState.MADE_PROGRESS;
        }
        if (this.numActiveQueues > 0) {
            this.tracker.notDone();
        }
        return this.tracker.toProgressState();
    }

    private boolean maybeEmitWm(long j, Consumer<Object> consumer) {
        if (j == Long.MIN_VALUE) {
            return false;
        }
        consumer.accept(new Watermark(j));
        return true;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public boolean isDone() {
        return this.numActiveQueues == 0;
    }

    private ProgressState drainQueue(Pipe<Object> pipe, Consumer<Object> consumer) {
        this.itemDetector.reset(consumer);
        int drain = pipe.drain(this.itemDetector);
        this.itemDetector.dest = null;
        return ProgressState.valueOf(drain > 0, this.itemDetector.item == DoneItem.DONE_ITEM);
    }

    private void observeBarrier(int i, long j) {
        if (j != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + j + ", expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(i);
    }
}
