package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.class */
class CommittableCollectorSerializerTest {
    private static final SimpleVersionedSerializer<Integer> COMMITTABLE_SERIALIZER = new IntegerSerializer();
    private static final CommittableCollectorSerializer<Integer> SERIALIZER = new CommittableCollectorSerializer<>(COMMITTABLE_SERIALIZER, 1, 1);

    CommittableCollectorSerializerTest() {
    }

    @Test
    void testCommittableCollectorV1SerDe() throws IOException {
        List asList = Arrays.asList(1, 2, 3);
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
        dataOutputSerializer.writeInt(-1189141204);
        SimpleVersionedSerialization.writeVersionAndSerializeList(COMMITTABLE_SERIALIZER, asList, dataOutputSerializer);
        CommittableCollector deserialize = SERIALIZER.deserialize(1, dataOutputSerializer.getCopyOfBuffer());
        Assertions.assertThat(deserialize.getNumberOfSubtasks()).isEqualTo(1);
        Assertions.assertThat(deserialize.isFinished()).isFalse();
        Assertions.assertThat(deserialize.getSubtaskId()).isEqualTo(0);
        Collection checkpointCommittables = deserialize.getCheckpointCommittables();
        Assertions.assertThat(checkpointCommittables).hasSize(1);
        Assertions.assertThat((List) ((CheckpointCommittableManagerImpl) checkpointCommittables.iterator().next()).getSubtaskCommittableManager(0).getPendingRequests().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{1, 2, 3});
    }

    @Test
    void testCommittableCollectorV2SerDe() throws IOException {
        CommittableCollector committableCollector = new CommittableCollector(2, 3);
        committableCollector.addMessage(new CommittableSummary(2, 3, 1L, 1, 1, 0));
        committableCollector.addMessage(new CommittableSummary(2, 3, 2L, 1, 1, 0));
        committableCollector.addMessage(new CommittableWithLineage(1, 1L, 2));
        committableCollector.addMessage(new CommittableWithLineage(2, 2L, 2));
        CommittableCollector deserialize = SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat(deserialize.getSubtaskId()).isEqualTo(1);
        Assertions.assertThat(deserialize.isFinished()).isFalse();
        Assertions.assertThat(deserialize.getNumberOfSubtasks()).isEqualTo(1);
        Collection checkpointCommittables = committableCollector.getCheckpointCommittables();
        Assertions.assertThat(checkpointCommittables).hasSize(2);
        Iterator it = checkpointCommittables.iterator();
        Assertions.assertThat((List) ((CheckpointCommittableManagerImpl) it.next()).getSubtaskCommittableManager(2).getPendingRequests().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{1});
        Assertions.assertThat((List) ((CheckpointCommittableManagerImpl) it.next()).getSubtaskCommittableManager(2).getPendingRequests().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{2});
    }
}
