package com.hazelcast.jet.impl.util;

import com.hazelcast.client.map.helpers.AMapStore;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.nio.BufferObjectDataInput;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriterImpl;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.impl.reliable.ReliableTopicDestroyTest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import junit.framework.TestCase;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImplTest.class */
public class AsyncSnapshotWriterImplTest extends JetTestSupport {
    private static final String ALWAYS_FAILING_MAP = "alwaysFailingMap";

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private NodeEngineImpl nodeEngine;
    private AsyncSnapshotWriterImpl writer;
    private IMap<AsyncSnapshotWriterImpl.SnapshotDataKey, byte[]> map;
    private InternalSerializationService serializationService;
    private InternalPartitionService partitionService;
    private SnapshotContext snapshotContext;

    /* loaded from: input_file:com/hazelcast/jet/impl/util/AsyncSnapshotWriterImplTest$AlwaysFailingMapStore.class */
    static class AlwaysFailingMapStore extends AMapStore implements Serializable {
        AlwaysFailingMapStore() {
        }

        @Override // com.hazelcast.client.map.helpers.AMapStore
        public void store(Object obj, Object obj2) {
            throw new RuntimeException("Always failing store");
        }
    }

    @Before
    public void before() {
        Config config = new Config();
        config.getMapConfig(ALWAYS_FAILING_MAP).getMapStoreConfig().setEnabled(true).setImplementation(new AlwaysFailingMapStore());
        config.getJetConfig().setEnabled(true);
        HazelcastInstance createHazelcastInstance = createHazelcastInstance(config);
        this.nodeEngine = Util.getNodeEngine(createHazelcastInstance);
        this.serializationService = Util.getSerializationService(createHazelcastInstance);
        this.partitionService = this.nodeEngine.getPartitionService();
        this.snapshotContext = (SnapshotContext) Mockito.mock(SnapshotContext.class);
        Mockito.when(this.snapshotContext.currentMapName()).thenReturn("map1");
        Mockito.when(Long.valueOf(this.snapshotContext.currentSnapshotId())).thenReturn(0L);
        this.writer = new AsyncSnapshotWriterImpl(128, this.nodeEngine, this.snapshotContext, "vertex", 0, 1, this.nodeEngine.getSerializationService());
        Mockito.when(Long.valueOf(this.snapshotContext.currentSnapshotId())).thenReturn(1L);
        this.map = createHazelcastInstance.getMap("map1");
        TestCase.assertTrue(this.writer.usableChunkCapacity > 0);
    }

    @After
    public void after() {
        TestCase.assertTrue(this.writer.flushAndResetMap());
        assertTrueEventually(() -> {
            Assert.assertFalse(((Boolean) Util.uncheckCall(() -> {
                return Boolean.valueOf(this.writer.hasPendingAsyncOps());
            })).booleanValue());
        });
        TestCase.assertTrue(this.writer.isEmpty());
    }

    @Test
    public void test_flushingAtEdgeCases() {
        for (int i = 64; i < 196; i++) {
            Mockito.when(this.snapshotContext.currentMapName()).thenReturn(randomMapName());
            this.writer = new AsyncSnapshotWriterImpl(128, this.nodeEngine, this.snapshotContext, "vertex", 0, 1, this.nodeEngine.getSerializationService());
            try {
                TestCase.assertTrue(this.writer.offer(Util.entry(serialize("k"), serialize(String.join("", Collections.nCopies(i, "a"))))));
                TestCase.assertTrue(this.writer.flushAndResetMap());
            } catch (Exception e) {
                throw new RuntimeException("error at i=" + i, e);
            }
        }
    }

    @Test
    public void when_writeOneKeyAndFlush_then_written() {
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize("v"));
        TestCase.assertTrue(this.writer.offer(entry));
        assertTrueAllTheTime(() -> {
            TestCase.assertTrue(this.map.isEmpty());
        }, 1L);
        Assert.assertFalse(this.writer.isEmpty());
        TestCase.assertTrue(this.writer.flushAndResetMap());
        assertTargetMapEntry("k", 0, serializedLength(entry));
        TestCase.assertEquals(1, this.map.size());
    }

    @Test
    public void when_chunkSizeWouldExceedLimit_then_flushedAutomatically() {
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize("v"));
        int length = (this.writer.usableChunkCapacity - this.writer.serializedByteArrayHeader.length) / serializedLength(entry);
        TestCase.assertTrue("entriesInChunk=" + length, length > 1 && length < 10);
        for (int i = 0; i < length; i++) {
            TestCase.assertTrue(this.writer.offer(entry));
        }
        assertTrueAllTheTime(() -> {
            TestCase.assertTrue((String) this.map.entrySet().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", ", "[", "]")), this.map.isEmpty());
        }, 1L);
        TestCase.assertTrue(this.writer.offer(entry));
        assertTargetMapEntry("k", 0, serializedLength(entry) * length);
        Assert.assertFalse(this.writer.isEmpty());
        for (int i2 = 1; i2 < length; i2++) {
            TestCase.assertTrue(this.writer.offer(entry));
        }
        assertTrueAllTheTime(() -> {
            TestCase.assertEquals(1, this.map.size());
        }, 1L);
        TestCase.assertTrue(this.writer.offer(entry));
        assertTargetMapEntry("k", 1, serializedLength(entry) * length);
    }

    @Test
    public void when_twoPartitions_then_twoEntries() {
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize("v"));
        Map.Entry<Data, Data> entry2 = Util.entry(serialize("kk"), serialize("vv"));
        TestCase.assertTrue(this.writer.offer(entry));
        TestCase.assertTrue(this.writer.offer(entry2));
        TestCase.assertTrue(this.writer.flushAndResetMap());
        assertTargetMapEntry("k", 0, serializedLength(entry));
        assertTargetMapEntry("kk", 1, serializedLength(entry2));
    }

    @Test
    public void when_singleLargeEntry_then_flushedImmediatelyAndDeserializesCorrectly() throws IOException {
        String str = (String) Stream.generate(() -> {
            return "a";
        }).limit(128L).collect(Collectors.joining());
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize(str));
        TestCase.assertTrue("entry not longer than usable chunk size", serializedLength(entry) > this.writer.usableChunkCapacity);
        TestCase.assertTrue(this.writer.offer(entry));
        assertTargetMapEntry("k", 0, serializedLength(entry));
        TestCase.assertEquals(1L, this.writer.getTotalChunks());
        TestCase.assertEquals(1L, this.writer.getTotalKeys());
        byte[] bArr = (byte[]) this.map.get(new AsyncSnapshotWriterImpl.SnapshotDataKey(this.writer.partitionKey(this.partitionService.getPartitionId("k")), 1L, "vertex", 0));
        TestCase.assertEquals(bArr.length + 4, this.writer.getTotalPayloadBytes());
        BufferObjectDataInput createObjectDataInput = this.serializationService.createObjectDataInput(bArr);
        TestCase.assertEquals("k", (String) this.serializationService.readObject(createObjectDataInput, true));
        TestCase.assertEquals(str, (String) this.serializationService.readObject(createObjectDataInput, true));
        TestCase.assertEquals(AsyncSnapshotWriterImpl.SnapshotDataValueTerminator.INSTANCE, this.serializationService.readObject(createObjectDataInput, true));
    }

    @Test
    public void when_cannotAutoFlush_then_offerReturnsFalse() {
        this.writer.numConcurrentAsyncOps.set(1000);
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize("v"));
        int length = (this.writer.usableChunkCapacity - this.writer.serializedByteArrayHeader.length) / serializedLength(entry);
        TestCase.assertTrue("entriesInChunk=" + length, length > 1 && length < 10);
        for (int i = 0; i < length; i++) {
            TestCase.assertTrue(this.writer.offer(entry));
        }
        Assert.assertFalse("offer should not have succeeded, too many parallel operations", this.writer.offer(entry));
        this.writer.numConcurrentAsyncOps.set(0);
        TestCase.assertTrue("offer should have succeeded", this.writer.offer(entry));
        assertTargetMapEntry("k", 0, serializedLength(entry) * length);
    }

    @Test
    public void when_cannotFlushRemaining_then_returnsFalse() {
        this.writer.numConcurrentAsyncOps.set(1000);
        Map.Entry<Data, Data> entry = Util.entry(serialize("k"), serialize("v"));
        Map.Entry<Data, Data> entry2 = Util.entry(serialize("kk"), serialize("vv"));
        TestCase.assertTrue(this.writer.offer(entry));
        TestCase.assertTrue(this.writer.offer(entry2));
        assertTrueAllTheTime(() -> {
            Assert.assertFalse(this.writer.flushAndResetMap());
            TestCase.assertTrue(this.map.isEmpty());
        }, 3L);
        this.writer.numConcurrentAsyncOps.decrementAndGet();
        assertTrueEventually(() -> {
            TestCase.assertTrue(this.writer.flushAndResetMap());
        });
        assertTargetMapEntry("k", 0, serializedLength(entry));
        assertTargetMapEntry("kk", 1, serializedLength(entry2));
    }

    @Test
    public void when_error_then_reported() {
        Mockito.when(this.snapshotContext.currentMapName()).thenReturn(ALWAYS_FAILING_MAP);
        TestCase.assertTrue(this.writer.offer(Util.entry(serialize("k"), serialize("v"))));
        TestCase.assertTrue(this.writer.flushAndResetMap());
        assertTrueEventually(() -> {
            Assert.assertThat(String.valueOf(this.writer.getError()), StringContains.containsString("Always failing store"));
        });
    }

    @Test
    public void test_serializeAndDeserialize() throws Exception {
        Data data = this.serializationService.toData(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME);
        Data data2 = this.serializationService.toData("bar");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Stream.of((Object[]) new Data[]{data, data2}).forEach(data3 -> {
            Assert.assertTrue("unexpected class: " + data3.getClass(), data3 instanceof HeapData);
            byte[] byteArray = data3.toByteArray();
            byteArrayOutputStream.write(byteArray, 4, byteArray.length - 4);
        });
        BufferObjectDataInput createObjectDataInput = this.serializationService.createObjectDataInput(byteArrayOutputStream.toByteArray());
        Assert.assertEquals(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME, this.serializationService.readObject(createObjectDataInput, true));
        Assert.assertEquals("bar", this.serializationService.readObject(createObjectDataInput, true));
    }

    @Test
    public void when_noItemsAndNoCurrentMap_then_flushAndResetReturnsFalse() {
        Mockito.when(this.snapshotContext.currentMapName()).thenReturn((Object) null);
        Assert.assertFalse(this.writer.flushAndResetMap());
        Mockito.when(this.snapshotContext.currentMapName()).thenReturn("map1");
    }

    private void assertTargetMapEntry(String str, int i, int i2) {
        AsyncSnapshotWriterImpl.SnapshotDataKey snapshotDataKey = new AsyncSnapshotWriterImpl.SnapshotDataKey(this.writer.partitionKey(this.partitionService.getPartitionId(str)), 1L, "vertex", i);
        int length = i2 + this.writer.valueTerminator.length;
        assertTrueEventually(() -> {
            TestCase.assertEquals(length, ((byte[]) this.map.get(snapshotDataKey)).length);
        }, 3L);
    }

    private int serializedLength(Map.Entry<Data, Data> entry) {
        return (entry.getKey().totalSize() + entry.getValue().totalSize()) - 8;
    }

    private Data serialize(String str) {
        return this.serializationService.toData(str);
    }

    @Test
    public void when_bufferExceeded_then_thrown() {
        AsyncSnapshotWriterImpl.CustomByteArrayOutputStream customByteArrayOutputStream = new AsyncSnapshotWriterImpl.CustomByteArrayOutputStream(4);
        customByteArrayOutputStream.write(1);
        customByteArrayOutputStream.write(1);
        customByteArrayOutputStream.write(1);
        customByteArrayOutputStream.write(1);
        this.exception.expect(RuntimeException.class);
        customByteArrayOutputStream.write(1);
    }
}
