/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class SpillableSubpartitionTest
extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static final IOManager ioManager = new IOManagerAsync();

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    @Override
    ResultSubpartition createSubpartition() {
        return new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager, IOManager.IOMode.SYNC);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final CountDownLatch blockLatch = new CountDownLatch(1);
        AsynchronousBufferFileWriter spillWriter = (AsynchronousBufferFileWriter)Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                blockLatch.countDown();
                doneLatch.await();
                return null;
            }
        }).when((Object)spillWriter)).close();
        IOManager ioManager = (IOManager)Mockito.mock(IOManager.class);
        Mockito.when((Object)ioManager.createBufferFileWriter((FileIOChannel.ID)Matchers.any(FileIOChannel.ID.class))).thenReturn((Object)spillWriter);
        final SpillableSubpartition partition = new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager, IOManager.IOMode.SYNC);
        partition.releaseMemory();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Void> blockingFinish = executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                partition.finish();
                return null;
            }
        });
        blockLatch.await();
        partition.releaseMemory();
        doneLatch.countDown();
        blockingFinish.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition partition = new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager, IOManager.IOMode.SYNC);
        partition.finish();
        ResultSubpartitionView readView = (ResultSubpartitionView)Mockito.spy((Object)partition.createReadView((BufferProvider)new TestInfiniteBufferProvider()));
        ((ResultSubpartitionView)Mockito.doNothing().when((Object)readView)).releaseAllResources();
        partition.release();
        Assert.assertNull((Object)readView.getNextBuffer());
    }
}

