package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.SkewReductionPolicy;
import com.hazelcast.util.function.Predicate;
import java.util.Collection;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream.class */
public class ConcurrentInboundEdgeStream implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final WatermarkDetector wmDetector = new WatermarkDetector();
    private long lastEmittedWm = Long.MIN_VALUE;
    private final SkewReductionPolicy skewReductionPolicy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$WatermarkDetector.class */
    public static final class WatermarkDetector implements Predicate<Object> {
        Collection<Object> dest;
        Watermark wm;
        boolean isDone;
        static final /* synthetic */ boolean $assertionsDisabled;

        private WatermarkDetector() {
        }

        void reset(Collection<Object> collection) {
            this.dest = collection;
            this.wm = null;
            this.isDone = false;
        }

        @Override // com.hazelcast.util.function.Predicate
        public boolean test(Object obj) {
            if (obj instanceof Watermark) {
                if (!$assertionsDisabled && this.wm != null) {
                    throw new AssertionError("Received multiple Watermarks without a call to reset()");
                }
                this.wm = (Watermark) obj;
                return false;
            }
            if (obj == DoneItem.DONE_ITEM) {
                this.isDone = true;
                return false;
            }
            this.dest.add(obj);
            return true;
        }

        static {
            $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
        }
    }

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> concurrentConveyor, int i, int i2) {
        this.conveyor = concurrentConveyor;
        this.ordinal = i;
        this.priority = i2;
        this.skewReductionPolicy = new SkewReductionPolicy(concurrentConveyor.queueCount());
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int ordinal() {
        return this.ordinal;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int priority() {
        return this.priority;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public ProgressState drainTo(Collection<Object> collection) {
        this.tracker.reset();
        for (int i = 0; i < this.conveyor.queueCount(); i++) {
            int queueIndex = this.skewReductionPolicy.toQueueIndex(i);
            QueuedPipe<Object> queue = this.conveyor.queue(queueIndex);
            if (queue != null) {
                Watermark drainUpToWm = drainUpToWm(queue, collection);
                if (this.wmDetector.isDone) {
                    this.conveyor.removeQueue(queueIndex);
                } else if (drainUpToWm != null && this.skewReductionPolicy.observeWm(queueIndex, drainUpToWm.timestamp())) {
                    break;
                }
            }
        }
        long bottomObservedWm = this.skewReductionPolicy.bottomObservedWm();
        if (bottomObservedWm > this.lastEmittedWm) {
            collection.add(new Watermark(bottomObservedWm));
            this.lastEmittedWm = bottomObservedWm;
        }
        return this.tracker.toProgressState();
    }

    private Watermark drainUpToWm(Pipe<Object> pipe, Collection<Object> collection) {
        this.wmDetector.reset(collection);
        this.tracker.mergeWith(ProgressState.valueOf(pipe.drain(this.wmDetector) > 0, this.wmDetector.isDone));
        this.wmDetector.dest = null;
        return this.wmDetector.wm;
    }
}
