package org.apache.flink.core.fs;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.util.AbstractAutoCloseableRegistry;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/core/fs/AbstractAutoCloseableRegistryTest.class */
public abstract class AbstractAutoCloseableRegistryTest<C extends Closeable, E extends C, T> {
    private static final int TEST_TIMEOUT_SECONDS = 10;
    protected ProducerThread[] streamOpenThreads;
    protected AbstractAutoCloseableRegistry<C, E, T, IOException> closeableRegistry;
    protected AtomicInteger unclosedCounter;

    /* loaded from: input_file:org/apache/flink/core/fs/AbstractAutoCloseableRegistryTest$BlockingTestCloseable.class */
    private static class BlockingTestCloseable implements Closeable {
        private final CountDownLatch closeCalledLatch;
        private final CountDownLatch blockCloseLatch;

        private BlockingTestCloseable() {
            this.closeCalledLatch = new CountDownLatch(1);
            this.blockCloseLatch = new CountDownLatch(1);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.closeCalledLatch.countDown();
            try {
                this.blockCloseLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void unblockClose() {
            this.blockCloseLatch.countDown();
        }

        public void awaitClose(long j, TimeUnit timeUnit) throws InterruptedException {
            Assert.assertTrue(this.closeCalledLatch.await(j, timeUnit));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/core/fs/AbstractAutoCloseableRegistryTest$ProducerThread.class */
    public static abstract class ProducerThread<C extends Closeable, E extends C, T> extends Thread {
        protected final AbstractAutoCloseableRegistry<C, E, T, IOException> registry;
        protected final AtomicInteger refCount;
        protected final int maxStreams;
        protected int numStreams = 0;

        public ProducerThread(AbstractAutoCloseableRegistry<C, E, T, IOException> abstractAutoCloseableRegistry, AtomicInteger atomicInteger, int i) {
            this.registry = abstractAutoCloseableRegistry;
            this.refCount = atomicInteger;
            this.maxStreams = i;
        }

        protected abstract void createAndRegisterStream() throws IOException;

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.numStreams < this.maxStreams) {
                try {
                    createAndRegisterStream();
                    try {
                        Thread.sleep(0L);
                    } catch (InterruptedException e) {
                    }
                    if (this.maxStreams != Integer.MAX_VALUE) {
                        this.numStreams++;
                    }
                } catch (Exception e2) {
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/core/fs/AbstractAutoCloseableRegistryTest$TestCloseable.class */
    private static class TestCloseable implements Closeable {
        private final AtomicBoolean closed;

        private TestCloseable() {
            this.closed = new AtomicBoolean();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            Assert.assertTrue("TestCloseable was already closed", this.closed.compareAndSet(false, true));
        }

        public boolean isClosed() {
            return this.closed.get();
        }
    }

    /* loaded from: input_file:org/apache/flink/core/fs/AbstractAutoCloseableRegistryTest$TestStream.class */
    protected static final class TestStream extends FSDataInputStream {
        protected AtomicInteger refCount;

        public TestStream(AtomicInteger atomicInteger) {
            this.refCount = atomicInteger;
            atomicInteger.incrementAndGet();
        }

        public void seek(long j) throws IOException {
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public int read() throws IOException {
            return 0;
        }

        public synchronized void close() throws IOException {
            this.refCount.decrementAndGet();
        }
    }

    protected abstract void registerCloseable(Closeable closeable) throws IOException;

    protected abstract AbstractAutoCloseableRegistry<C, E, T, IOException> createRegistry();

    protected abstract ProducerThread<C, E, T> createProducerThread(AbstractAutoCloseableRegistry<C, E, T, IOException> abstractAutoCloseableRegistry, AtomicInteger atomicInteger, int i);

    public void setup(int i) {
        Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
        this.closeableRegistry = createRegistry();
        this.unclosedCounter = new AtomicInteger(0);
        this.streamOpenThreads = new ProducerThread[TEST_TIMEOUT_SECONDS];
        for (int i2 = 0; i2 < this.streamOpenThreads.length; i2++) {
            this.streamOpenThreads[i2] = createProducerThread(this.closeableRegistry, this.unclosedCounter, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startThreads() {
        for (ProducerThread producerThread : this.streamOpenThreads) {
            producerThread.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void joinThreads() throws InterruptedException {
        for (ProducerThread producerThread : this.streamOpenThreads) {
            producerThread.join();
        }
    }

    @Test
    public void testClose() throws Exception {
        setup(Integer.MAX_VALUE);
        startThreads();
        for (int i = 0; i < 5; i++) {
            System.gc();
            Thread.sleep(40L);
        }
        this.closeableRegistry.close();
        joinThreads();
        Assert.assertEquals(0L, this.unclosedCounter.get());
        Assert.assertEquals(0L, this.closeableRegistry.getNumberOfRegisteredCloseables());
        TestCloseable testCloseable = new TestCloseable();
        try {
            registerCloseable(testCloseable);
            Assert.fail("Closed registry should not accept closeables!");
        } catch (IOException e) {
        }
        Assert.assertTrue(testCloseable.isClosed());
        Assert.assertEquals(0L, this.unclosedCounter.get());
        Assert.assertEquals(0L, this.closeableRegistry.getNumberOfRegisteredCloseables());
    }

    @Test
    public void testNonBlockingClose() throws Exception {
        setup(Integer.MAX_VALUE);
        BlockingTestCloseable blockingTestCloseable = new BlockingTestCloseable();
        registerCloseable(blockingTestCloseable);
        Assert.assertEquals(1L, this.closeableRegistry.getNumberOfRegisteredCloseables());
        Thread thread = new Thread(() -> {
            try {
                this.closeableRegistry.close();
            } catch (IOException e) {
            }
        });
        thread.start();
        blockingTestCloseable.awaitClose(10L, TimeUnit.SECONDS);
        TestCloseable testCloseable = new TestCloseable();
        try {
            registerCloseable(testCloseable);
            Assert.fail("Closed registry should not accept closeables!");
        } catch (IOException e) {
        }
        blockingTestCloseable.unblockClose();
        thread.join();
        Assert.assertTrue(testCloseable.isClosed());
        Assert.assertEquals(0L, this.closeableRegistry.getNumberOfRegisteredCloseables());
    }
}
