package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.operation.SnapshotPhase1Operation;
import com.hazelcast.jet.impl.util.MockAsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/StoreSnapshotTaskletTest.class */
public class StoreSnapshotTaskletTest extends JetTestSupport {

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private SnapshotContext ssContext;
    private MockInboundStream input;
    private StoreSnapshotTasklet sst;
    private MockAsyncSnapshotWriter mockSsWriter;

    private void init(List<Object> list) {
        this.ssContext = new SnapshotContext(Logger.getLogger(SnapshotContext.class), "test job", 1L, ProcessingGuarantee.EXACTLY_ONCE);
        this.ssContext.initTaskletCount(1, 1, 0);
        ArrayList arrayList = new ArrayList(list);
        for (int i = 0; i < arrayList.size(); i++) {
            if (arrayList.get(i) instanceof Map.Entry) {
                Map.Entry entry = (Map.Entry) arrayList.get(i);
                arrayList.set(i, Util.entry(serialize((String) entry.getKey()), serialize((String) entry.getValue())));
            }
        }
        this.input = new MockInboundStream(0, arrayList, 128);
        this.mockSsWriter = new MockAsyncSnapshotWriter();
        this.sst = new StoreSnapshotTasklet(this.ssContext, this.input, this.mockSsWriter, Logger.getLogger(this.mockSsWriter.getClass()), "myVertex", false);
    }

    @Test
    public void when_doneItemOnInput_then_eventuallyDone() {
        init(Collections.singletonList(DoneItem.DONE_ITEM));
        Assert.assertEquals(ProgressState.DONE, this.sst.call());
    }

    @Test
    public void when_item_then_offeredToSsWriter() {
        init(Collections.singletonList(Util.entry("k", "v")));
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(Util.entry(serialize("k"), serialize("v")), this.mockSsWriter.poll());
        Assert.assertNull(this.mockSsWriter.poll());
    }

    @Test
    public void when_notAbleToOffer_then_offeredLater() {
        init(Collections.singletonList(Util.entry("k", "v")));
        this.mockSsWriter.ableToOffer = false;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(0L, this.input.remainingItems().size());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        Assert.assertNull(this.mockSsWriter.poll());
        this.mockSsWriter.ableToOffer = true;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(Util.entry(serialize("k"), serialize("v")), this.mockSsWriter.poll());
        Assert.assertNull(this.mockSsWriter.poll());
    }

    @Test
    public void when_barrier_then_snapshotDone() {
        init(Collections.singletonList(new SnapshotBarrier(2L, false)));
        this.ssContext.startNewSnapshotPhase1(2L, "map", 0);
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(3L, this.sst.pendingSnapshotId);
    }

    @Test
    public void when_itemAndBarrier_then_snapshotDone() {
        init(Arrays.asList(Util.entry("k", "v"), new SnapshotBarrier(2L, false)));
        this.ssContext.startNewSnapshotPhase1(2L, "map", 0);
        Assert.assertEquals(2L, this.sst.pendingSnapshotId);
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        this.mockSsWriter.hasPendingFlushes = false;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(3L, this.sst.pendingSnapshotId);
        Assert.assertEquals(Util.entry(serialize("k"), serialize("v")), this.mockSsWriter.poll());
    }

    @Test
    public void when_notAbleToFlush_then_tryAgain() {
        init(Collections.singletonList(new SnapshotBarrier(2L, false)));
        this.ssContext.startNewSnapshotPhase1(2L, "map", 0);
        this.mockSsWriter.ableToFlushRemaining = false;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        this.mockSsWriter.ableToFlushRemaining = true;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        Assert.assertEquals(3L, this.sst.pendingSnapshotId);
    }

    @Test
    public void test_waitingForFlushesToComplete() {
        init(Arrays.asList(Util.entry("k", "v"), new SnapshotBarrier(2L, false)));
        this.ssContext.startNewSnapshotPhase1(2L, "map", 0);
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        Assert.assertTrue(this.mockSsWriter.hasPendingFlushes);
        this.mockSsWriter.hasPendingFlushes = false;
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertEquals(ProgressState.NO_PROGRESS, this.sst.call());
        Assert.assertEquals(3L, this.sst.pendingSnapshotId);
    }

    @Test
    public void when_snapshotFails_then_reportedToContext() throws Exception {
        init(Collections.singletonList(new SnapshotBarrier(2L, false)));
        RuntimeException runtimeException = new RuntimeException("mock failure");
        this.mockSsWriter.failure = runtimeException;
        CompletableFuture startNewSnapshotPhase1 = this.ssContext.startNewSnapshotPhase1(2L, "map", 0);
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertFalse(startNewSnapshotPhase1.isDone());
        Assert.assertEquals(ProgressState.MADE_PROGRESS, this.sst.call());
        Assert.assertTrue(startNewSnapshotPhase1.isDone());
        Assert.assertEquals(runtimeException.toString(), ((SnapshotPhase1Operation.SnapshotPhase1Result) startNewSnapshotPhase1.get()).getError());
        Assert.assertEquals(3L, this.sst.pendingSnapshotId);
    }

    private HeapData serialize(String str) {
        return new HeapData(("abcd" + str).getBytes(StandardCharsets.UTF_16));
    }
}
