package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.class */
class SinkWriterOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$DummySinkOperator.class */
    private static class DummySinkOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>(DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        private DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.sinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SINK_STATE_DESC), TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.sinkState.add(streamRecord.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$SnapshottingBufferingSinkWriter.class */
    public static class SnapshottingBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId;

        private SnapshottingBufferingSinkWriter() {
            this.lastCheckpointId = -1L;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> snapshotState(long j) {
            this.lastCheckpointId = j;
            return this.elements;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        void restoredFrom(List<String> list) {
            this.elements = new ArrayList(list);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> prepareCommit(boolean z) {
            if (!z) {
                return Collections.emptyList();
            }
            List<String> list = this.elements;
            this.elements = new ArrayList();
            return list;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$TestCommitterOperator.class */
    private static class TestCommitterOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer;

        private TestCommitterOperator() {
            this.buffer = new ArrayList();
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.committerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new TestingCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.buffer.add(streamRecord.getValue());
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            this.committerState.add(this.buffer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$TestingCommittableSerializer.class */
    private static class TestingCommittableSerializer extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            super(simpleVersionedSerializer);
            this.committableSerializer = simpleVersionedSerializer;
        }

        public byte[] serialize(List<String> list) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
            dataOutputSerializer.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, list, dataOutputSerializer);
            return dataOutputSerializer.getCopyOfBuffer();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$TimeBasedBufferingSinkWriter.class */
    private static class TimeBasedBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables;

        private TimeBasedBufferingSinkWriter() {
            this.cachedCommittables = new ArrayList();
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void write(Integer num, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of(num, context.timestamp(), Long.valueOf(context.currentWatermark())).toString());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void setProcessingTimerService(Sink.ProcessingTimeService processingTimeService) {
            super.setProcessingTimerService(processingTimeService);
            this.processingTimerService.registerProcessingTimer(1000L, this);
        }

        public void onProcessingTime(long j) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(j + 1000, this);
        }
    }

    SinkWriterOperatorTest() {
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        TestSink.DefaultSinkWriter defaultSinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(defaultSinkWriter).build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(defaultSinkWriter.elements).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(defaultSinkWriter.elements).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        TestSink.DefaultSinkWriter defaultSinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(defaultSinkWriter).build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(0L);
        oneInputStreamOperatorTestHarness.processWatermark(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).containsExactly(new Object[]{new Watermark(0L), new Watermark(1L)});
        Assertions.assertThat(defaultSinkWriter.watermarks).containsExactly(new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)});
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimeBasedBufferingSinkWriter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().setWriter(new TimeBasedBufferingSinkWriter()).build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(0L);
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 0, 1L);
        oneInputStreamOperatorTestHarness.getOutput().poll();
        oneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(2001L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 2, 2L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 2, 1L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.endInput();
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 1, Long.MAX_VALUE);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testStateRestore(boolean z) throws Exception {
        SnapshottingBufferingSinkWriter snapshottingBufferingSinkWriter = new SnapshottingBufferingSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter = createTestHarnessWithBufferingSinkWriter(snapshottingBufferingSinkWriter, z);
        createTestHarnessWithBufferingSinkWriter.open();
        createTestHarnessWithBufferingSinkWriter.processWatermark(0L);
        createTestHarnessWithBufferingSinkWriter.processElement(1, 1L);
        createTestHarnessWithBufferingSinkWriter.processElement(2, 2L);
        createTestHarnessWithBufferingSinkWriter.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createTestHarnessWithBufferingSinkWriter.snapshot(1L, 1L);
        Assertions.assertThat(createTestHarnessWithBufferingSinkWriter.getOutput()).hasSize(2).contains(new Object[]{new Watermark(0L)});
        Assertions.assertThat(snapshottingBufferingSinkWriter.lastCheckpointId).isEqualTo(z ? 1L : -1L);
        createTestHarnessWithBufferingSinkWriter.close();
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter2 = createTestHarnessWithBufferingSinkWriter(new SnapshottingBufferingSinkWriter(), z);
        createTestHarnessWithBufferingSinkWriter2.initializeState(snapshot);
        createTestHarnessWithBufferingSinkWriter2.open();
        createTestHarnessWithBufferingSinkWriter2.endInput();
        createTestHarnessWithBufferingSinkWriter2.prepareSnapshotPreBarrier(2L);
        if (z) {
            assertBasicOutput(createTestHarnessWithBufferingSinkWriter2.getOutput(), 2, Long.MAX_VALUE);
        } else {
            Assertions.assertThat(SinkTestUtil.fromOutput(createTestHarnessWithBufferingSinkWriter2.getOutput()).get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(obj -> {
                SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasOverallCommittables(0).hasPendingCommittables(0).hasFailedCommittables(0);
            });
        }
        createTestHarnessWithBufferingSinkWriter2.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLoadPreviousSinkState(boolean z) throws Exception {
        List asList = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new DummySinkOperator(), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createCompatibleStateTestHarness = createCompatibleStateTestHarness(z);
        ArrayList arrayList = z ? new ArrayList(asList) : new ArrayList();
        arrayList.add(Tuple3.of(1, 1, Long.MIN_VALUE).toString());
        createCompatibleStateTestHarness.initializeState(buildSubtaskState);
        createCompatibleStateTestHarness.open();
        createCompatibleStateTestHarness.processElement(1, 1L);
        createCompatibleStateTestHarness.endInput();
        createCompatibleStateTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createCompatibleStateTestHarness.snapshot(1L, 1L);
        createCompatibleStateTestHarness.close();
        assertEmitted(arrayList, createCompatibleStateTestHarness.getOutput());
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createCompatibleStateTestHarness2 = createCompatibleStateTestHarness(z);
        List asList2 = Arrays.asList(Tuple3.of(2, 2, Long.MIN_VALUE).toString(), Tuple3.of(3, 3, Long.MIN_VALUE).toString());
        createCompatibleStateTestHarness2.initializeState(snapshot);
        createCompatibleStateTestHarness2.open();
        createCompatibleStateTestHarness2.processElement(2, 2L);
        createCompatibleStateTestHarness2.processElement(3, 3L);
        createCompatibleStateTestHarness2.endInput();
        createCompatibleStateTestHarness2.prepareSnapshotPreBarrier(2L);
        assertEmitted(asList2, createCompatibleStateTestHarness2.getOutput());
        createCompatibleStateTestHarness2.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List asList = Arrays.asList("state1", "state2");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new TestCommitterOperator(), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().setWriter(new TestSink.DefaultSinkWriter()).build().asV2()));
        oneInputStreamOperatorTestHarness.initializeState(buildSubtaskState);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(oneInputStreamOperatorTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(4);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasPendingCommittables(asList.size()).hasCheckpointId(1L).hasOverallCommittables(asList.size()).hasFailedCommittables(0);
        });
        assertRestoredCommitterCommittable(fromOutput.get(1).asRecord().getValue(), (String) asList.get(0));
        assertRestoredCommitterCommittable(fromOutput.get(2).asRecord().getValue(), (String) asList.get(1));
        Assertions.assertThat(fromOutput.get(3).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(obj2 -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj2).hasPendingCommittables(0).hasCheckpointId(2L).hasOverallCommittables(0).hasFailedCommittables(0);
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testHandleEndInputInStreamingMode(boolean z) throws Exception {
        TestSink.DefaultSinkWriter defaultSinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(defaultSinkWriter).setDefaultCommitter().build().asV2()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(defaultSinkWriter.elements).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.endInput();
        if (z) {
            oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        }
        assertEmitted(Collections.singletonList("(1,1,-9223372036854775808)"), oneInputStreamOperatorTestHarness.getOutput());
        Assertions.assertThat(defaultSinkWriter.elements).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    private static void assertRestoredCommitterCommittable(Object obj, String str) {
        Assertions.assertThat(obj).isInstanceOf(CommittableWithLineage.class).satisfies(obj2 -> {
            SinkV2Assertions.assertThat((CommittableWithLineage<?>) obj2).hasCommittable(str).hasCheckpointId(1L).hasSubtaskId(0);
        });
    }

    private static void assertEmitted(List<String> list, Queue<Object> queue) {
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(queue);
        Assertions.assertThat(fromOutput).hasSize(list.size() + 1);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasPendingCommittables(list.size()).hasOverallCommittables(list.size()).hasFailedCommittables(0);
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= list.size(); i++) {
            Object value = fromOutput.get(i).asRecord().getValue();
            Assertions.assertThat(value).isInstanceOf(CommittableWithLineage.class);
            arrayList.add(((CommittableWithLineage) value).getCommittable());
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter(SnapshottingBufferingSinkWriter snapshottingBufferingSinkWriter, boolean z) throws Exception {
        TestSink.Builder<W> writer = TestSink.newBuilder().setDefaultCommitter().setWriter(snapshottingBufferingSinkWriter);
        if (z) {
            writer.withWriterState();
        }
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(writer.build().asV2()));
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createCompatibleStateTestHarness(boolean z) throws Exception {
        TestSink.Builder<W> writer = TestSink.newBuilder().setDefaultCommitter().setCompatibleStateNames("dummy_sink_state").setWriter(new SnapshottingBufferingSinkWriter());
        if (z) {
            writer.withWriterState();
        }
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(writer.build().asV2()));
    }

    private static void assertBasicOutput(Collection<Object> collection, int i, @Nullable Long l) {
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(collection);
        Assertions.assertThat(fromOutput).hasSize(i + 1);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasOverallCommittables(i).hasPendingCommittables(i).hasFailedCommittables(0);
        });
        for (int i2 = 1; i2 <= i; i2++) {
            Assertions.assertThat(fromOutput.get(i2).asRecord().getValue()).isInstanceOf(CommittableWithLineage.class).satisfies(obj2 -> {
                SinkV2Assertions.assertThat((CommittableWithLineage<?>) obj2).hasCheckpointId(l).hasSubtaskId(0);
            });
        }
    }
}
