package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.ILogger;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Snapshots.class */
public class ProcessorTaskletTest_Snapshots {
    private static final int MOCK_INPUT_SIZE = 10;
    private static final int CALL_COUNT_LIMIT = 10;
    private List<Object> mockInput;
    private List<MockInboundStream> instreams;
    private List<OutboundEdgeStream> outstreams;
    private SnapshottableProcessor processor;
    private Processor.Context context;
    private SerializationService serializationService;
    private SnapshotContext snapshotContext;
    private MockOutboundCollector snapshotCollector;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Snapshots$SnapshottableProcessor.class */
    public static class SnapshottableProcessor implements Processor {
        int itemsToEmitInTryProcess;
        int itemsToEmitInTryProcessWatermark;
        int itemsToEmitInComplete;
        int itemsToEmitInCompleteEdge;
        int itemsToEmitInSnapshotPrepareCommit;
        int itemsToEmitInOnSnapshotComplete;
        int tryProcessCount;
        int tryProcessWatermarkCount;
        int completedCount;
        int completedEdgeCount;
        int snapshotPrepareCommitCount;
        int onSnapshotCompletedCount;
        boolean isStreaming;
        private Outbox outbox;
        private final Queue<Map.Entry> snapshotQueue;

        private SnapshottableProcessor() {
            this.snapshotQueue = new ArrayDeque();
        }

        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
            this.outbox = outbox;
        }

        public void process(int i, @Nonnull Inbox inbox) {
            while (true) {
                Object peek = inbox.peek();
                if (peek == null || !this.outbox.offer(peek)) {
                    return;
                } else {
                    this.snapshotQueue.offer(Util.entry(UuidUtil.newUnsecureUUID(), inbox.poll()));
                }
            }
        }

        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            if (this.tryProcessWatermarkCount < this.itemsToEmitInTryProcessWatermark && this.outbox.offer("tryProcessWatermark-" + this.tryProcessWatermarkCount)) {
                this.tryProcessWatermarkCount++;
            }
            return this.tryProcessWatermarkCount == this.itemsToEmitInTryProcessWatermark && this.outbox.offer(watermark);
        }

        public boolean complete() {
            if (this.completedCount < this.itemsToEmitInComplete && this.outbox.offer(Integer.valueOf(this.completedCount))) {
                this.snapshotQueue.add(Util.entry(UuidUtil.newUnsecureUUID(), Integer.valueOf(this.completedCount)));
                this.completedCount++;
            }
            return this.completedCount == this.itemsToEmitInComplete && !this.isStreaming;
        }

        public boolean completeEdge(int i) {
            if (this.completedEdgeCount < this.itemsToEmitInCompleteEdge && this.outbox.offer("completeEdge-" + this.completedEdgeCount)) {
                this.completedEdgeCount++;
            }
            return this.completedEdgeCount == this.itemsToEmitInCompleteEdge;
        }

        public boolean tryProcess() {
            if (this.tryProcessCount < this.itemsToEmitInTryProcess && this.outbox.offer("tryProcess-" + this.tryProcessCount)) {
                this.tryProcessCount++;
            }
            return this.tryProcessCount == this.itemsToEmitInTryProcess;
        }

        public boolean saveToSnapshot() {
            while (true) {
                Map.Entry peek = this.snapshotQueue.peek();
                if (peek == null) {
                    return true;
                }
                if (!this.outbox.offerToSnapshot(peek.getKey(), peek.getValue())) {
                    return false;
                }
                this.snapshotQueue.remove();
            }
        }

        public boolean snapshotCommitPrepare() {
            if (this.snapshotPrepareCommitCount < this.itemsToEmitInSnapshotPrepareCommit && this.outbox.offer("spc-" + this.snapshotPrepareCommitCount)) {
                this.snapshotPrepareCommitCount++;
            }
            return this.snapshotPrepareCommitCount == this.itemsToEmitInSnapshotPrepareCommit;
        }

        public boolean snapshotCommitFinish(boolean z) {
            if (this.onSnapshotCompletedCount < this.itemsToEmitInOnSnapshotComplete && this.outbox.offer("osc-" + this.onSnapshotCompletedCount)) {
                this.onSnapshotCompletedCount++;
            }
            return this.onSnapshotCompletedCount == this.itemsToEmitInOnSnapshotComplete;
        }

        public void restoreFromSnapshot(@Nonnull Inbox inbox) {
            while (true) {
                Object poll = inbox.poll();
                if (poll == null) {
                    return;
                } else {
                    this.snapshotQueue.add((Map.Entry) poll);
                }
            }
        }

        public boolean finishSnapshotRestore() {
            return this.outbox.offer("finishRestore");
        }
    }

    @Before
    public void setUp() {
        this.mockInput = (List) IntStream.range(0, 10).boxed().collect(Collectors.toList());
        this.processor = new SnapshottableProcessor();
        this.serializationService = new DefaultSerializationServiceBuilder().build();
        this.context = new TestProcessorContext().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
        this.instreams = new ArrayList();
        this.outstreams = new ArrayList();
        this.snapshotCollector = new MockOutboundCollector(1024);
    }

    @Test
    public void when_isCooperative_then_true() {
        Assert.assertTrue(createTasklet(ProcessingGuarantee.AT_LEAST_ONCE).isCooperative());
    }

    @Test
    public void when_singleInbound_then_savesAllToSnapshotAndOutbound() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.mockInput.subList(0, 4));
        arrayList.add(barrier0(false));
        MockInboundStream mockInboundStream = new MockInboundStream(0, arrayList, arrayList.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(ProcessingGuarantee.AT_LEAST_ONCE), ProgressState.NO_PROGRESS);
        Assert.assertEquals(arrayList, mockOutboundStream.getBuffer());
        Assert.assertEquals(arrayList, getSnapshotBufferValues());
    }

    @Test
    public void when_multipleInbound_then_waitForBarrier() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.mockInput.subList(0, 4));
        arrayList.add(barrier0(false));
        arrayList.addAll(this.mockInput.subList(4, 8));
        ArrayList arrayList2 = new ArrayList();
        MockInboundStream mockInboundStream = new MockInboundStream(0, arrayList, 1024);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, arrayList2, 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList(0, 1, 2, 3), mockOutboundStream.getBuffer());
        Assert.assertEquals(Collections.emptyList(), getSnapshotBufferValues());
        mockInboundStream2.push(barrier0(false));
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList(0, 1, 2, 3, barrier0(false), 4, 5, 6, 7), mockOutboundStream.getBuffer());
        Assert.assertEquals(Arrays.asList(0, 1, 2, 3, barrier0(false)), getSnapshotBufferValues());
    }

    @Test
    public void when_snapshotTriggered_then_saveSnapshot_prepare_emitBarrier() {
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInComplete = 4;
        this.processor.itemsToEmitInSnapshotPrepareCommit = 1;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList(0, 1), mockOutboundStream.getBuffer());
        Assert.assertEquals(Collections.emptyList(), getSnapshotBufferValues());
        this.snapshotContext.startNewSnapshotPhase1(0L, "map", 0);
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList(2, "spc-0"), mockOutboundStream.getBuffer());
        Assert.assertEquals(Arrays.asList(0, 1, 2, barrier0(false)), getSnapshotBufferValues());
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList(barrier0(false), 3), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_exportOnly_then_commitMethodsNotCalled() {
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInSnapshotPrepareCommit = 1;
        this.processor.itemsToEmitInOnSnapshotComplete = 1;
        this.processor.isStreaming = true;
        this.snapshotContext.startNewSnapshotPhase1(0L, "map", SnapshotFlags.create(false, true));
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.singletonList(barrier0(false)), mockOutboundStream.getBuffer());
        this.snapshotContext.phase1DoneForTasklet(0L, 0L, 0L);
        mockOutboundStream.flush();
        this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.emptyList(), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_snapshotRestoreInput_then_restoreMethodsCalled() {
        MockInboundStream mockInboundStream = new MockInboundStream(Integer.MIN_VALUE, Arrays.asList(Util.entry("k1", "v1"), Util.entry("k2", "v2"), DoneItem.DONE_ITEM), 1024);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(DoneItem.DONE_ITEM), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInTryProcess = 1;
        callUntil(createTasklet, ProgressState.DONE);
        Assert.assertEquals(Arrays.asList("finishRestore", DoneItem.DONE_ITEM), mockOutboundStream.getBuffer());
        Assert.assertEquals(Collections.singletonList(DoneItem.DONE_ITEM), getSnapshotBufferValues());
        Assert.assertEquals(0L, this.processor.tryProcessCount);
    }

    @Test
    public void test_phase2() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Arrays.asList("item", barrier0(false)), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInOnSnapshotComplete = 1;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList("item", barrier0(false)), getSnapshotBufferValues());
        Assert.assertEquals(Arrays.asList("item", barrier0(false)), mockOutboundStream.getBuffer());
        this.snapshotCollector.getBuffer().clear();
        mockOutboundStream.flush();
        phase1StartAndDone(false);
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.singletonList("osc-0"), mockOutboundStream.getBuffer());
        Assert.assertEquals(Collections.emptyList(), getSnapshotBufferValues());
        Assert.assertTrue("future not done", startNewSnapshotPhase2.isDone());
    }

    @Test
    public void when_processorCompletesAfterPhase1_then_doneAfterPhase2() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Arrays.asList("item", barrier0(false), DoneItem.DONE_ITEM), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInOnSnapshotComplete = 1;
        CompletableFuture startNewSnapshotPhase1 = this.snapshotContext.startNewSnapshotPhase1(0L, "map", 0);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Arrays.asList("item", barrier0(false)), getSnapshotBufferValues());
        Assert.assertEquals(Arrays.asList("item", barrier0(false)), mockOutboundStream.getBuffer());
        this.snapshotContext.phase1DoneForTasklet(1L, 1L, 1L);
        Assert.assertTrue("future1 not done", startNewSnapshotPhase1.isDone());
        this.snapshotCollector.getBuffer().clear();
        mockOutboundStream.flush();
        mockInboundStream.push("item2");
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.emptyList(), mockOutboundStream.getBuffer());
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.DONE);
        Assert.assertEquals(Arrays.asList("osc-0", DoneItem.DONE_ITEM), mockOutboundStream.getBuffer());
        Assert.assertEquals(Collections.singletonList(DoneItem.DONE_ITEM), getSnapshotBufferValues());
        Assert.assertTrue("future2 not done", startNewSnapshotPhase2.isDone());
    }

    @Test
    public void when_onSnapshotCompletedReturnsFalse_then_calledAgain() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(barrier0(false)), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInOnSnapshotComplete = 2;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.singletonList(barrier0(false)), getSnapshotBufferValues());
        Assert.assertEquals(Collections.singletonList(barrier0(false)), mockOutboundStream.getBuffer());
        this.snapshotCollector.getBuffer().clear();
        mockOutboundStream.flush();
        phase1StartAndDone(false);
        this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.singletonList("osc-0"), mockOutboundStream.getBuffer());
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertEquals(Collections.singletonList("osc-1"), mockOutboundStream.getBuffer());
    }

    @Test
    public void test_tryProcessWatermark_notInterruptedByPhase2() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(JetTestSupport.wm(0L)), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInTryProcessWatermark = 1;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        phase1StartAndDone(false);
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertFalse("future should not have been done", startNewSnapshotPhase2.isDone());
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertTrue("future should have been done", startNewSnapshotPhase2.isDone());
    }

    @Test
    public void test_nullaryTryProcess_notInterruptedByPhase2() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.emptyList(), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        this.processor.itemsToEmitInTryProcess = 2;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        phase1StartAndDone(false);
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertFalse("future should not have been done", startNewSnapshotPhase2.isDone());
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertTrue("future should have been done", startNewSnapshotPhase2.isDone());
    }

    @Test
    public void test_terminalSnapshot_receivedInBarrier() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(barrier0(true)), 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        phase1StartAndDone(false);
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.DONE);
        Assert.assertTrue("future should have been done", startNewSnapshotPhase2.isDone());
        Assert.assertEquals(mockOutboundStream.getBuffer(), Arrays.asList(barrier0(true), DoneItem.DONE_ITEM));
    }

    @Test
    public void test_terminalSnapshot_source() {
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.outstreams.add(mockOutboundStream);
        this.processor.isStreaming = true;
        ProcessorTasklet createTasklet = createTasklet(ProcessingGuarantee.EXACTLY_ONCE);
        phase1StartAndDone(true);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        CompletableFuture startNewSnapshotPhase2 = this.snapshotContext.startNewSnapshotPhase2(0L, true);
        callUntil(createTasklet, ProgressState.DONE);
        Assert.assertTrue("future should have been done", startNewSnapshotPhase2.isDone());
        Assert.assertEquals(mockOutboundStream.getBuffer(), Arrays.asList(barrier0(true), DoneItem.DONE_ITEM));
    }

    private ProcessorTasklet createTasklet(ProcessingGuarantee processingGuarantee) {
        for (int i = 0; i < this.instreams.size(); i++) {
            this.instreams.get(i).setOrdinal(i);
        }
        this.snapshotContext = new SnapshotContext((ILogger) Mockito.mock(ILogger.class), "test job", -1L, processingGuarantee);
        this.snapshotContext.initTaskletCount(1, 1, 0);
        ProcessorTasklet processorTasklet = new ProcessorTasklet(this.context, TestUtil.DIRECT_EXECUTOR, this.serializationService, this.processor, this.instreams, this.outstreams, this.snapshotContext, this.snapshotCollector, false);
        processorTasklet.init();
        return processorTasklet;
    }

    private List<Object> getSnapshotBufferValues() {
        return (List) this.snapshotCollector.getBuffer().stream().map(obj -> {
            return obj instanceof Map.Entry ? deserializeEntryValue((Map.Entry) obj) : obj;
        }).collect(Collectors.toList());
    }

    private Object deserializeEntryValue(Map.Entry entry) {
        return this.serializationService.toObject(entry.getValue());
    }

    private static void callUntil(ProcessorTasklet processorTasklet, ProgressState progressState) {
        int i = 0;
        while (true) {
            ProgressState call = processorTasklet.call();
            if (call == progressState) {
                return;
            }
            Assert.assertEquals("Failed to make progress after " + i + " iterations", ProgressState.MADE_PROGRESS, call);
            i++;
            Assert.assertTrue(String.format("tasklet.call() invoked %d times without reaching %s. Last state was %s", 10, progressState, call), i < 10);
        }
    }

    private SnapshotBarrier barrier0(boolean z) {
        return new SnapshotBarrier(0L, z);
    }

    private void phase1StartAndDone(boolean z) {
        this.snapshotContext.startNewSnapshotPhase1(0L, "map", SnapshotFlags.create(z, false));
        this.snapshotContext.phase1DoneForTasklet(0L, 0L, 0L);
    }
}
