package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Blocking.class */
public class ProcessorTaskletTest_Blocking {
    private static final int MOCK_INPUT_SIZE = 10;
    private static final int CALL_COUNT_LIMIT = 10;
    private Processor.Context context;
    private List<Object> mockInput;
    private List<MockInboundStream> instreams;
    private List<OutboundEdgeStream> outstreams;
    private PassThroughProcessor processor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Blocking$PassThroughProcessor.class */
    public static class PassThroughProcessor implements Processor {
        Outbox outbox;
        int nullaryProcessCallCount;
        int itemsToEmitInComplete;
        int itemCountToProcess = Integer.MAX_VALUE;

        private PassThroughProcessor() {
        }

        public boolean isCooperative() {
            return false;
        }

        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
            this.outbox = outbox;
        }

        public void process(int i, @Nonnull Inbox inbox) {
            Object poll;
            while (this.itemCountToProcess > 0 && (poll = inbox.poll()) != null) {
                emit(poll);
                this.itemCountToProcess--;
            }
            this.itemCountToProcess = Integer.MAX_VALUE;
        }

        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            return this.outbox.offer(watermark);
        }

        public boolean tryProcess() {
            int i = this.nullaryProcessCallCount + 1;
            this.nullaryProcessCallCount = i;
            return i > 0;
        }

        public boolean complete() {
            if (this.itemsToEmitInComplete == 0) {
                return true;
            }
            if (this.outbox.offer("completing")) {
                this.itemsToEmitInComplete--;
            }
            return this.itemsToEmitInComplete == 0;
        }

        private void emit(Object obj) {
            if (!this.outbox.offer(obj)) {
                throw new AssertionError("Blocking outbox refused an item: " + obj);
            }
        }
    }

    @Before
    public void setUp() {
        this.processor = new PassThroughProcessor();
        this.context = new TestProcessorContext();
        this.mockInput = (List) IntStream.range(0, 10).boxed().collect(Collectors.toList());
        this.instreams = new ArrayList();
        this.outstreams = new ArrayList();
    }

    @Test
    public void when_isCooperative_then_false() {
        Assert.assertFalse(createTasklet().isCooperative());
    }

    @Test
    public void when_singleInstreamAndOutstream_then_outstreamGetsAll() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream outstream = outstream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(outstream);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, outstream.getBuffer());
    }

    @Test
    public void when_oneInstreamAndTwoOutstreams_then_allOutstreamsGetAllItems() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream outstream = outstream(0);
        MockOutboundStream outstream2 = outstream(1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(outstream);
        this.outstreams.add(outstream2);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, outstream.getBuffer());
        Assert.assertEquals(this.mockInput, outstream2.getBuffer());
    }

    @Test
    public void when_instreamChunked_then_processAllEventually() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, 4);
        MockOutboundStream outstream = outstream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(outstream);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, outstream.getBuffer());
    }

    @Test
    public void when_3instreams_then_pushAllIntoOutstream() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput.subList(0, 4), 4);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, this.mockInput.subList(4, 8), 4);
        MockInboundStream mockInboundStream3 = new MockInboundStream(0, this.mockInput.subList(8, 10), 4);
        mockInboundStream.push(DoneItem.DONE_ITEM);
        mockInboundStream2.push(DoneItem.DONE_ITEM);
        mockInboundStream3.push(DoneItem.DONE_ITEM);
        this.instreams.addAll(Arrays.asList(mockInboundStream, mockInboundStream2, mockInboundStream3));
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 20);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(), ProgressState.DONE);
        this.mockInput.add(DoneItem.DONE_ITEM);
        Assert.assertEquals(new HashSet(this.mockInput), new HashSet(mockOutboundStream.getBuffer()));
    }

    @Test
    public void when_inboxEmpty_then_nullaryProcessCalled() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.emptyList(), 1);
        MockOutboundStream outstream = outstream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(outstream);
        ProcessorTasklet createTasklet = createTasklet();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertTrue(this.processor.nullaryProcessCallCount > 0);
    }

    @Test
    public void when_inboxNotEmpty_then_notDone() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1024);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.itemCountToProcess = 1;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        mockInboundStream.push(DoneItem.DONE_ITEM);
        callUntil(createTasklet, ProgressState.DONE);
        this.mockInput.add(DoneItem.DONE_ITEM);
        Assert.assertEquals(this.mockInput, mockOutboundStream.getBuffer());
    }

    @Test
    public void when_completeReturnsFalse_then_retried() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(DoneItem.DONE_ITEM), 1);
        MockOutboundStream outstream = outstream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(outstream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.itemsToEmitInComplete = 2;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.DONE, createTasklet.call());
        Assert.assertTrue(this.processor.itemsToEmitInComplete == 0);
    }

    @Test
    public void when_outboxBucketCount_then_equalsOutstreamCount() {
        this.outstreams.add(outstream(0));
        this.outstreams.add(outstream(1));
        createTasklet();
        Assert.assertEquals(2L, this.processor.outbox.bucketCount());
    }

    @Test
    public void when_emitToOneOrdinal_then_onlyOneBucketFilled() {
        MockOutboundStream outstream = outstream(0);
        MockOutboundStream outstream2 = outstream(1);
        this.outstreams.add(outstream);
        this.outstreams.add(outstream2);
        this.processor = new PassThroughProcessor() { // from class: com.hazelcast.jet.impl.execution.ProcessorTaskletTest_Blocking.1
            @Override // com.hazelcast.jet.impl.execution.ProcessorTaskletTest_Blocking.PassThroughProcessor
            public boolean complete() {
                Assert.assertTrue(this.outbox.offer(1, "completing"));
                return true;
            }
        };
        ProcessorTasklet createTasklet = createTasklet();
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.DONE, createTasklet.call());
        Assert.assertTrue("Ordinal 0 received an item", outstream.getBuffer().size() == 1);
        Assert.assertTrue("Ordinal 1 didn't receive an item", outstream2.getBuffer().size() == 2);
    }

    @Test
    public void when_emitToSpecificOrdinals_then_onlyThoseBucketsFilled() {
        MockOutboundStream outstream = outstream(0);
        MockOutboundStream outstream2 = outstream(1);
        MockOutboundStream outstream3 = outstream(2);
        this.outstreams.add(outstream);
        this.outstreams.add(outstream2);
        this.outstreams.add(outstream3);
        this.processor = new PassThroughProcessor() { // from class: com.hazelcast.jet.impl.execution.ProcessorTaskletTest_Blocking.2
            @Override // com.hazelcast.jet.impl.execution.ProcessorTaskletTest_Blocking.PassThroughProcessor
            public boolean complete() {
                Assert.assertTrue(this.outbox.offer(new int[]{1, 2}, "completing"));
                return true;
            }
        };
        ProcessorTasklet createTasklet = createTasklet();
        Assert.assertEquals(ProgressState.MADE_PROGRESS, createTasklet.call());
        Assert.assertEquals(ProgressState.DONE, createTasklet.call());
        Assert.assertTrue("Ordinal 0 received an item", outstream.getBuffer().size() == 1);
        Assert.assertTrue("Ordinal 1 didn't receive an item", outstream2.getBuffer().size() == 2);
        Assert.assertTrue("Ordinal 2 didn't receive an item", outstream3.getBuffer().size() == 2);
    }

    private ProcessorTasklet createTasklet() {
        for (int i = 0; i < this.instreams.size(); i++) {
            this.instreams.get(i).setOrdinal(i);
        }
        ProcessorTasklet processorTasklet = new ProcessorTasklet(this.context, TestUtil.DIRECT_EXECUTOR, new DefaultSerializationServiceBuilder().build(), this.processor, this.instreams, this.outstreams, new MockSnapshotContext(), new MockOutboundCollector(10), false);
        processorTasklet.init();
        return processorTasklet;
    }

    private static MockOutboundStream outstream(int i) {
        return new MockOutboundStream(i, 1024);
    }

    private static void callUntil(Tasklet tasklet, ProgressState progressState) {
        System.out.println("================= call tasklet");
        int i = 0;
        while (true) {
            ProgressState call = tasklet.call();
            if (call == progressState) {
                return;
            }
            Assert.assertEquals("Failed to make progress", ProgressState.MADE_PROGRESS, call);
            i++;
            Assert.assertTrue(String.format("tasklet.call() invoked %d times without reaching %s. Last state was %s", 10, progressState, call), i < 10);
            System.out.println("================= call tasklet");
        }
    }
}
