package com.hazelcast.jet.impl.execution;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.OneToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream_OrderedDrainTest.class */
public class ConcurrentInboundEdgeStream_OrderedDrainTest {
    private static final Object senderGone = new Object();
    private OneToOneConcurrentArrayQueue<Object> q1;
    private OneToOneConcurrentArrayQueue<Object> q2;
    private InboundEdgeStream stream;

    @Before
    public void setUp() {
        this.q1 = new OneToOneConcurrentArrayQueue<>(128);
        this.q2 = new OneToOneConcurrentArrayQueue<>(128);
        this.stream = ConcurrentInboundEdgeStream.create(ConcurrentConveyor.concurrentConveyor(senderGone, new QueuedPipe[]{this.q1, this.q2}), 0, 0, false, "cies", ComparatorEx.naturalOrder());
    }

    @Test
    public void when_twoEmittersOneDoneFirst_then_madeProgress() {
        add(this.q1, 1, 2, DoneItem.DONE_ITEM);
        add(this.q2, 6);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1, 2, 6);
        add(this.q2, 7, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, 7);
    }

    @Test
    public void when_twoEmittersDrainedAtOnce_then_firstCallDone() {
        add(this.q1, 1, 2, DoneItem.DONE_ITEM);
        add(this.q2, 6, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, 1, 2, 6);
    }

    @Test
    public void when_allEmittersInitiallyDone_then_firstCallDone() {
        this.q1.add(DoneItem.DONE_ITEM);
        this.q2.add(DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, new Object[0]);
    }

    @Test
    public void when_oneEmitterWithNoProgress_then_noProgress() {
        add(this.q2, 1, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.NO_PROGRESS, new Object[0]);
        drainAndAssert(ProgressState.NO_PROGRESS, new Object[0]);
        this.q1.add(DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, 1);
    }

    @Test
    public void when_receivingWatermarks_then_fail() {
        add(this.q1, JetTestSupport.wm(1L));
        Assertions.assertThatThrownBy(() -> {
            drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        }).hasMessageContaining("Unexpected item observed: Watermark");
    }

    @Test
    public void when_receivingBarriers_then_fail() {
        add(this.q1, barrier(1L));
        Assertions.assertThatThrownBy(() -> {
            drainAndAssert(ProgressState.NO_PROGRESS, new Object[0]);
        }).hasMessageContaining("Unexpected item observed: SnapshotBarrier");
    }

    @Test
    public void when_oneQueueDone_then_theOtherWorks() {
        add(this.q1, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q2, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        add(this.q2, 2);
        drainAndAssert(ProgressState.MADE_PROGRESS, 2);
    }

    @Test
    public void when_disorder_then_throw() {
        add(this.q1, 2, 1);
        add(this.q2, 3);
        Assertions.assertThatThrownBy(() -> {
            drainAndAssert(ProgressState.DONE, new Object[0]);
        }).hasMessageContaining("Disorder on a monotonicOrder edge");
    }

    private void drainAndAssert(ProgressState progressState, Object... objArr) {
        ArrayList arrayList = new ArrayList();
        InboundEdgeStream inboundEdgeStream = this.stream;
        Objects.requireNonNull(arrayList);
        Assert.assertEquals("progressState", progressState, inboundEdgeStream.drainTo(arrayList::add));
        Assert.assertEquals(Arrays.asList(objArr), arrayList);
    }

    private void add(OneToOneConcurrentArrayQueue<Object> oneToOneConcurrentArrayQueue, Object... objArr) {
        oneToOneConcurrentArrayQueue.addAll(Arrays.asList(objArr));
    }

    private SnapshotBarrier barrier(long j) {
        return new SnapshotBarrier(j, false);
    }
}
