package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.function.Predicate;
import java.util.Arrays;
import java.util.BitSet;
import java.util.function.Consumer;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream.class */
public class ConcurrentInboundEdgeStream implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final ConcurrentConveyor<Object> conveyor;
    private final boolean waitForSnapshot;
    private final long[] queueWms;
    private final BitSet receivedBarriers;
    private long pendingSnapshotId;
    private long numActiveQueues;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ItemDetector itemDetector = new ItemDetector();
    private long lastEmittedWm = Long.MIN_VALUE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$ItemDetector.class */
    public static final class ItemDetector implements Predicate<Object> {
        Consumer<Object> dest;
        BroadcastItem item;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ItemDetector() {
        }

        void reset(Consumer<Object> consumer) {
            this.dest = consumer;
            this.item = null;
        }

        @Override // com.hazelcast.util.function.Predicate
        public boolean test(Object obj) {
            if (!(obj instanceof Watermark) && !(obj instanceof SnapshotBarrier) && obj != DoneItem.DONE_ITEM) {
                this.dest.accept(obj);
                return true;
            }
            if (!$assertionsDisabled && this.item != null) {
                throw new AssertionError("Received multiple special items without a call to reset(): " + this.item);
            }
            this.item = (BroadcastItem) obj;
            return false;
        }

        static {
            $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
        }
    }

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, long j, boolean z) {
        this.conveyor = concurrentConveyor;
        this.ordinal = i;
        this.priority = i2;
        this.waitForSnapshot = z;
        this.queueWms = new long[concurrentConveyor.queueCount()];
        Arrays.fill(this.queueWms, Long.MIN_VALUE);
        this.numActiveQueues = concurrentConveyor.queueCount();
        this.receivedBarriers = new BitSet(concurrentConveyor.queueCount());
        this.pendingSnapshotId = j + 1;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int ordinal() {
        return this.ordinal;
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public int priority() {
        return this.priority;
    }

    /* JADX WARN: Code restructure failed: missing block: B:26:0x0138, code lost:
    
        if (r6.numActiveQueues <= 0) goto L38;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x013b, code lost:
    
        r6.tracker.notDone();
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0149, code lost:
    
        return r6.tracker.toProgressState();
     */
    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public com.hazelcast.jet.impl.util.ProgressState drainTo(java.util.function.Consumer<java.lang.Object> r7) {
        /*
            Method dump skipped, instructions count: 330
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream.drainTo(java.util.function.Consumer):com.hazelcast.jet.impl.util.ProgressState");
    }

    @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
    public boolean isDone() {
        return this.numActiveQueues == 0;
    }

    private void drainQueue(Pipe<Object> pipe, Consumer<Object> consumer) {
        this.itemDetector.reset(consumer);
        this.tracker.mergeWith(ProgressState.valueOf(pipe.drain(this.itemDetector) > 0, this.itemDetector.item == DoneItem.DONE_ITEM));
        this.itemDetector.dest = null;
    }

    private void observeBarrier(int i, long j) {
        if (j != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + j + ", expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(i);
    }

    private void observeWm(int i, long j) {
        if (this.queueWms[i] >= j) {
            throw new JetException("Watermarks not monotonically increasing on queue: last one=" + this.queueWms[i] + ", new one=" + j);
        }
        this.queueWms[i] = j;
    }

    private long bottomObservedWm() {
        long j = this.queueWms[0];
        for (int i = 1; i < this.queueWms.length; i++) {
            if (this.queueWms[i] < j) {
                j = this.queueWms[i];
            }
        }
        return j;
    }
}
