package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.class */
public class BarrierBufferAlignmentLimitTest {
    private static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static IOManager ioManager;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest$CheckpointMatcher.class */
    private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> {
        private final long checkpointId;

        CheckpointMatcher(long j) {
            this.checkpointId = j;
        }

        public boolean matches(Object obj) {
            return obj != null && obj.getClass() == CheckpointMetaData.class && ((CheckpointMetaData) obj).getCheckpointId() == this.checkpointId;
        }

        public void describeTo(Description description) {
            description.appendText("CheckpointMetaData - id = " + this.checkpointId);
        }
    }

    @BeforeClass
    public static void setup() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdownIOManager() {
        ioManager.shutdown();
    }

    @Test
    public void testBreakCheckpointAtAlignmentLimit() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1, 100), createBuffer(2, 70), createBuffer(0, 42), createBuffer(2, 111), createBarrier(7L, 1), createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50), createBarrier(7L, 0), createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200), createBuffer(0, 101), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(7L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)};
        MockInputGate mockInputGate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr));
        BarrierBuffer barrierBuffer = new BarrierBuffer(mockInputGate, new BufferSpiller(ioManager, mockInputGate.getPageSize()), 1000L);
        AbstractInvokable abstractInvokable = (AbstractInvokable) Mockito.mock(AbstractInvokable.class);
        barrierBuffer.registerCheckpointEventHandler(abstractInvokable);
        check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[3], barrierBuffer.getNextNonBlocked());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[8], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
        validateAlignmentTime(nanoTime, barrierBuffer);
        ((AbstractInvokable) Mockito.verify(abstractInvokable, Mockito.times(1))).abortCheckpointOnBarrier(Mockito.eq(7L), (Throwable) Mockito.any(AlignmentLimitExceededException.class));
        check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[11], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[14], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[17], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
        ((AbstractInvokable) Mockito.verify(abstractInvokable, Mockito.times(0))).triggerCheckpointOnBarrier((CheckpointMetaData) Mockito.any(CheckpointMetaData.class), (CheckpointOptions) Mockito.any(CheckpointOptions.class), (CheckpointMetrics) Mockito.any(CheckpointMetrics.class));
        Assert.assertNull(barrierBuffer.getNextNonBlocked());
        Assert.assertNull(barrierBuffer.getNextNonBlocked());
        barrierBuffer.cleanup();
        checkNoTempFilesRemain();
    }

    @Test
    public void testAlignmentLimitWithQueuedAlignments() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1, 100), createBuffer(2, 70), createBarrier(3L, 2), createBuffer(1, 100), createBuffer(2, 100), createBarrier(3L, 0), createBuffer(0, 100), createBuffer(1, 100), createBarrier(4L, 0), createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100), createBuffer(2, 100), createBarrier(3L, 1), createBarrier(4L, 1), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(4L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)};
        MockInputGate mockInputGate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr));
        BarrierBuffer barrierBuffer = new BarrierBuffer(mockInputGate, new BufferSpiller(ioManager, mockInputGate.getPageSize()), 500L);
        AbstractInvokable abstractInvokable = (AbstractInvokable) Mockito.mock(AbstractInvokable.class);
        barrierBuffer.registerCheckpointEventHandler(abstractInvokable);
        check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[3], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[11], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[4], barrierBuffer.getNextNonBlocked());
        validateAlignmentTime(nanoTime, barrierBuffer);
        ((AbstractInvokable) Mockito.verify(abstractInvokable, Mockito.times(1))).abortCheckpointOnBarrier(Mockito.eq(3L), (Throwable) Mockito.any(AlignmentLimitExceededException.class));
        check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
        long nanoTime2 = System.nanoTime();
        check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[17], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
        validateAlignmentTime(nanoTime2, barrierBuffer);
        ((AbstractInvokable) Mockito.verify(abstractInvokable, Mockito.times(1))).triggerCheckpointOnBarrier((CheckpointMetaData) MockitoHamcrest.argThat(new CheckpointMatcher(4L)), (CheckpointOptions) Mockito.any(CheckpointOptions.class), (CheckpointMetrics) Mockito.any(CheckpointMetrics.class));
        check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
        check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
        ((AbstractInvokable) Mockito.verify(abstractInvokable, Mockito.times(0))).triggerCheckpointOnBarrier((CheckpointMetaData) MockitoHamcrest.argThat(new CheckpointMatcher(3L)), (CheckpointOptions) Mockito.any(CheckpointOptions.class), (CheckpointMetrics) Mockito.any(CheckpointMetrics.class));
        Assert.assertNull(barrierBuffer.getNextNonBlocked());
        Assert.assertNull(barrierBuffer.getNextNonBlocked());
        barrierBuffer.cleanup();
        checkNoTempFilesRemain();
    }

    private static BufferOrEvent createBuffer(int i, int i2) {
        byte[] bArr = new byte[i2];
        RND.nextBytes(bArr);
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
        allocateUnpooledSegment.put(0, bArr);
        NetworkBuffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
        networkBuffer.setSize(i2);
        networkBuffer.retainBuffer();
        return new BufferOrEvent(networkBuffer, i);
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return new BufferOrEvent(new CheckpointBarrier(j, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), i);
    }

    private static void check(BufferOrEvent bufferOrEvent, BufferOrEvent bufferOrEvent2) {
        Assert.assertNotNull(bufferOrEvent);
        Assert.assertNotNull(bufferOrEvent2);
        Assert.assertEquals(Boolean.valueOf(bufferOrEvent.isBuffer()), Boolean.valueOf(bufferOrEvent2.isBuffer()));
        if (!bufferOrEvent.isBuffer()) {
            Assert.assertEquals(bufferOrEvent.getEvent(), bufferOrEvent2.getEvent());
            return;
        }
        Assert.assertEquals(bufferOrEvent.getBuffer().getMaxCapacity(), bufferOrEvent2.getBuffer().getMaxCapacity());
        Assert.assertEquals(bufferOrEvent.getBuffer().getSize(), bufferOrEvent2.getBuffer().getSize());
        Assert.assertTrue("memory contents differs", bufferOrEvent.getBuffer().getMemorySegment().compare(bufferOrEvent2.getBuffer().getMemorySegment(), 0, 0, PAGE_SIZE) == 0);
    }

    private static void validateAlignmentTime(long j, BarrierBuffer barrierBuffer) {
        Assert.assertTrue("wrong alignment time", barrierBuffer.getAlignmentDurationNanos() <= System.nanoTime() - j);
    }

    private static void checkNoTempFilesRemain() {
        for (File file : ioManager.getSpillingDirectories()) {
            for (String str : file.list()) {
                if (str != null && !str.equals(".") && !str.equals("..")) {
                    Assert.fail("barrier buffer did not clean up temp files. remaining file: " + str);
                }
            }
        }
    }
}
