/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class LocalBufferPoolTest {
    private static final int numBuffers = 1024;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool networkBufferPool;
    private BufferPool localBufferPool;
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    @Before
    public void setupLocalBufferPool() {
        this.networkBufferPool = new NetworkBufferPool(1024, 128, MemoryType.HEAP);
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @After
    public void destroyAndVerifyAllBuffersReturned() throws IOException {
        if (!this.localBufferPool.isDestroyed()) {
            this.localBufferPool.lazyDestroy();
        }
        String msg = "Did not return all buffers to memory segment pool after test.";
        Assert.assertEquals((String)msg, (long)1024L, (long)this.networkBufferPool.getNumberOfAvailableMemorySegments());
    }

    @AfterClass
    public static void shutdownExecutor() {
        executor.shutdownNow();
    }

    @Test
    public void testRequestMoreThanAvailable() throws IOException {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            Buffer buffer2 = this.localBufferPool.requestBuffer();
            Assert.assertEquals((long)i, (long)this.getNumRequestedFromMemorySegmentPool());
            Assert.assertNotNull((Object)buffer2);
            requests.add(buffer2);
        }
        Buffer buffer = this.localBufferPool.requestBuffer();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        Assert.assertNull((Object)buffer);
        for (Buffer buffer2 : requests) {
            buffer2.recycle();
        }
    }

    @Test
    public void testRequestAfterDestroy() throws IOException {
        this.localBufferPool.lazyDestroy();
        try {
            this.localBufferPool.requestBuffer();
            Assert.fail((String)"Call should have failed with an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testRecycleAfterDestroy() throws IOException {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 0; i < 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        this.localBufferPool.lazyDestroy();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (Buffer buffer : requests) {
            buffer.recycle();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterRecycling() throws Exception {
        int i;
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (i = 1; i < 512; ++i) {
            ((Buffer)requests.remove(0)).recycle();
            Assert.assertEquals((long)(1024 - i), (long)this.getNumRequestedFromMemorySegmentPool());
        }
        for (Buffer buffer : requests) {
            buffer.recycle();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterChangingNumBuffers() throws Exception {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        for (Buffer buffer : requests) {
            buffer.recycle();
        }
        Assert.assertEquals((long)1024L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)512L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testSetLessThanRequiredNumBuffers() throws IOException {
        this.localBufferPool.setNumBuffers(1);
        this.localBufferPool.setNumBuffers(0);
    }

    @Test
    public void testPendingRequestWithListenerAfterRecycle() throws Exception {
        EventListener listener = (EventListener)PowerMockito.spy((Object)new EventListener<Buffer>(){

            public void onEvent(Buffer buffer) {
                buffer.recycle();
            }
        });
        this.localBufferPool.setNumBuffers(1);
        Buffer available = this.localBufferPool.requestBuffer();
        Buffer unavailable = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)unavailable);
        Assert.assertTrue((boolean)this.localBufferPool.addListener(listener));
        available.recycle();
        ((EventListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onEvent(Matchers.any(Buffer.class));
    }

    @Test
    public void testCancelPendingRequestsAfterDestroy() throws IOException {
        EventListener listener = (EventListener)Mockito.mock(EventListener.class);
        this.localBufferPool.setNumBuffers(1);
        Buffer available = this.localBufferPool.requestBuffer();
        Buffer unavailable = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)unavailable);
        this.localBufferPool.addListener(listener);
        this.localBufferPool.lazyDestroy();
        available.recycle();
        ((EventListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onEvent(null);
    }

    @Test
    public void testConcurrentRequestRecycle() throws ExecutionException, InterruptedException, IOException {
        int i;
        int numConcurrentTasks = 128;
        int numBuffersToRequestPerTask = 1024;
        this.localBufferPool.setNumBuffers(numConcurrentTasks);
        Future[] taskResults = new Future[numConcurrentTasks];
        for (i = 0; i < numConcurrentTasks; ++i) {
            taskResults[i] = executor.submit(new BufferRequesterTask((BufferProvider)this.localBufferPool, numBuffersToRequestPerTask));
        }
        for (i = 0; i < numConcurrentTasks; ++i) {
            Assert.assertTrue((boolean)((Boolean)taskResults[i].get()));
        }
    }

    @Test
    public void testDestroyDuringBlockingRequest() throws Exception {
        boolean numberOfBuffers = true;
        this.localBufferPool.setNumBuffers(1);
        final CountDownLatch sync = new CountDownLatch(1);
        Callable<List<Buffer>> requester = new Callable<List<Buffer>>(){

            @Override
            public List<Buffer> call() throws Exception {
                ArrayList requested = Lists.newArrayList();
                for (int i = 0; i < 1; ++i) {
                    requested.add(LocalBufferPoolTest.this.localBufferPool.requestBufferBlocking());
                }
                sync.countDown();
                try {
                    LocalBufferPoolTest.this.localBufferPool.requestBufferBlocking();
                    Assert.fail((String)"Call should have failed with an IllegalStateException");
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                return requested;
            }
        };
        Future<List<Buffer>> f = executor.submit(requester);
        sync.await();
        this.localBufferPool.lazyDestroy();
        Thread.sleep(50L);
        List<Buffer> requestedBuffers = f.get(60L, TimeUnit.SECONDS);
        for (Buffer buffer : requestedBuffers) {
            buffer.recycle();
        }
    }

    private int getNumRequestedFromMemorySegmentPool() {
        return this.networkBufferPool.getTotalNumberOfMemorySegments() - this.networkBufferPool.getNumberOfAvailableMemorySegments();
    }

    private static class BufferRequesterTask
    implements Callable<Boolean> {
        private final BufferProvider bufferProvider;
        private final int numBuffersToRequest;

        private BufferRequesterTask(BufferProvider bufferProvider, int numBuffersToRequest) {
            this.bufferProvider = bufferProvider;
            this.numBuffersToRequest = numBuffersToRequest;
        }

        @Override
        public Boolean call() throws Exception {
            try {
                for (int i = 0; i < this.numBuffersToRequest; ++i) {
                    Buffer buffer = this.bufferProvider.requestBufferBlocking();
                    buffer.recycle();
                }
            }
            catch (Throwable t) {
                return false;
            }
            return true;
        }
    }
}

