package org.apache.flink.runtime.io.disk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.class */
public class BatchShuffleReadBufferPoolTest {

    @Rule
    public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);

    @Test(expected = IllegalArgumentException.class)
    public void testIllegalTotalBytes() {
        createBufferPool(0L, 1024);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testIllegalBufferSize() {
        createBufferPool(33554432L, 0);
    }

    @Test
    public void testLargeTotalBytes() {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool(Long.MAX_VALUE, 1024);
        Assert.assertEquals(2147483647L, createBufferPool.getNumTotalBuffers());
        createBufferPool.destroy();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testTotalBytesSmallerThanBufferSize() {
        createBufferPool(4096L, 32768);
    }

    @Test
    public void testBufferCalculation() {
        for (int i = 4096; i <= 33554432; i += 1024) {
            BatchShuffleReadBufferPool createBufferPool = createBufferPool(33554432L, i);
            Assert.assertEquals(33554432L, createBufferPool.getTotalBytes());
            Assert.assertEquals(33554432 / i, createBufferPool.getNumTotalBuffers());
            Assert.assertTrue(createBufferPool.getNumBuffersPerRequest() <= createBufferPool.getNumTotalBuffers());
            Assert.assertTrue(createBufferPool.getNumBuffersPerRequest() > 0);
        }
    }

    @Test
    public void testRequestBuffers() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.addAll(createBufferPool.requestBuffers());
            Assert.assertEquals(createBufferPool.getNumBuffersPerRequest(), arrayList.size());
        } finally {
            createBufferPool.recycle(arrayList);
            createBufferPool.destroy();
        }
    }

    @Test
    public void testRecycle() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.recycle(createBufferPool.requestBuffers());
        Assert.assertEquals(createBufferPool.getNumTotalBuffers(), createBufferPool.getAvailableBuffers());
    }

    @Test
    public void testBufferOperationTimestampUpdated() throws Exception {
        BatchShuffleReadBufferPool batchShuffleReadBufferPool = new BatchShuffleReadBufferPool(1024L, 1024);
        long lastBufferOperationTimestamp = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        List requestBuffers = batchShuffleReadBufferPool.requestBuffers();
        Assert.assertEquals(1L, requestBuffers.size());
        Assert.assertTrue(batchShuffleReadBufferPool.getLastBufferOperationTimestamp() > lastBufferOperationTimestamp);
        long lastBufferOperationTimestamp2 = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        batchShuffleReadBufferPool.recycle(requestBuffers);
        Assert.assertTrue(batchShuffleReadBufferPool.getLastBufferOperationTimestamp() > lastBufferOperationTimestamp2);
        List requestBuffers2 = batchShuffleReadBufferPool.requestBuffers();
        long lastBufferOperationTimestamp3 = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        Assert.assertEquals(0L, batchShuffleReadBufferPool.requestBuffers().size());
        Assert.assertEquals(lastBufferOperationTimestamp3, batchShuffleReadBufferPool.getLastBufferOperationTimestamp());
        batchShuffleReadBufferPool.recycle(requestBuffers2);
        batchShuffleReadBufferPool.destroy();
    }

    @Test
    public void testBufferFulfilledByRecycledBuffers() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            Object[] objArr = new Object[8];
            for (int i = 0; i < 8; i++) {
                objArr[i] = new Object();
                concurrentHashMap.put(objArr[i], createBufferPool.requestBuffers());
            }
            Assert.assertEquals(0L, createBufferPool.getAvailableBuffers());
            Thread[] threadArr = new Thread[2];
            for (int i2 = 0; i2 < 2; i2++) {
                threadArr[i2] = new Thread(() -> {
                    try {
                        Object obj = new Object();
                        List list = null;
                        while (true) {
                            if (list != null && !list.isEmpty()) {
                                concurrentHashMap.put(obj, list);
                                return;
                            }
                            list = createBufferPool.requestBuffers();
                        }
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                });
                threadArr[i2].start();
            }
            Iterator it = ((List) concurrentHashMap.remove(objArr[0])).iterator();
            while (it.hasNext()) {
                createBufferPool.recycle((MemorySegment) it.next());
            }
            createBufferPool.recycle((Collection) concurrentHashMap.remove(objArr[1]));
            for (Thread thread : threadArr) {
                thread.join();
            }
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(0L, createBufferPool.getAvailableBuffers());
            Assert.assertEquals(8L, concurrentHashMap.size());
            Iterator it2 = concurrentHashMap.keySet().iterator();
            while (it2.hasNext()) {
                createBufferPool.recycle((Collection) concurrentHashMap.remove(it2.next()));
            }
            Assert.assertEquals(createBufferPool.getNumTotalBuffers(), createBufferPool.getAvailableBuffers());
            createBufferPool.destroy();
        } catch (Throwable th) {
            Iterator it3 = concurrentHashMap.keySet().iterator();
            while (it3.hasNext()) {
                createBufferPool.recycle((Collection) concurrentHashMap.remove(it3.next()));
            }
            Assert.assertEquals(createBufferPool.getNumTotalBuffers(), createBufferPool.getAvailableBuffers());
            createBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testMultipleThreadRequestAndRecycle() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        try {
            Thread[] threadArr = new Thread[10];
            for (int i = 0; i < 10; i++) {
                threadArr[i] = new Thread(() -> {
                    for (int i2 = 0; i2 < 100; i2++) {
                        try {
                            List requestBuffers = createBufferPool.requestBuffers();
                            Thread.sleep(10L);
                            if (i2 % 2 == 0) {
                                createBufferPool.recycle(requestBuffers);
                            } else {
                                Iterator it = requestBuffers.iterator();
                                while (it.hasNext()) {
                                    createBufferPool.recycle((MemorySegment) it.next());
                                }
                            }
                        } catch (Throwable th) {
                            atomicReference.set(th);
                            return;
                        }
                    }
                });
                threadArr[i].start();
            }
            for (Thread thread : threadArr) {
                thread.join();
            }
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(createBufferPool.getNumTotalBuffers(), createBufferPool.getAvailableBuffers());
            createBufferPool.destroy();
        } catch (Throwable th) {
            createBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testDestroy() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.recycle(createBufferPool.requestBuffers());
        Assert.assertFalse(createBufferPool.isDestroyed());
        Assert.assertEquals(createBufferPool.getNumTotalBuffers(), createBufferPool.getAvailableBuffers());
        Assert.assertEquals(createBufferPool.getNumTotalBuffers() - createBufferPool.requestBuffers().size(), createBufferPool.getAvailableBuffers());
        createBufferPool.destroy();
        Assert.assertTrue(createBufferPool.isDestroyed());
        Assert.assertEquals(0L, createBufferPool.getAvailableBuffers());
    }

    @Test(expected = IllegalStateException.class)
    public void testRequestBuffersAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.requestBuffers();
        createBufferPool.destroy();
        createBufferPool.requestBuffers();
    }

    @Test
    public void testRecycleAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        List requestBuffers = createBufferPool.requestBuffers();
        createBufferPool.destroy();
        createBufferPool.recycle(requestBuffers);
        Assert.assertEquals(0L, createBufferPool.getAvailableBuffers());
    }

    @Test
    public void testDestroyWhileBlockingRequest() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    createBufferPool.requestBuffers();
                } catch (Throwable th) {
                    atomicReference.set(th);
                    return;
                }
            }
        });
        thread.start();
        Thread.sleep(1000L);
        createBufferPool.destroy();
        thread.join();
        Assert.assertTrue(atomicReference.get() instanceof IllegalStateException);
    }

    private BatchShuffleReadBufferPool createBufferPool(long j, int i) {
        return new BatchShuffleReadBufferPool(j, i);
    }

    private BatchShuffleReadBufferPool createBufferPool() {
        return createBufferPool(33554432L, 32768);
    }
}
