package org.apache.hadoop.metrics2.impl;

import java.util.ConcurrentModificationException;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.impl.SinkQueue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-common-2.7.3-tests.jar:org/apache/hadoop/metrics2/impl/TestSinkQueue.class
  input_file:hadoop-common-2.7.3/share/hadoop/common/hadoop-common-2.7.3-tests.jar:org/apache/hadoop/metrics2/impl/TestSinkQueue.class
 */
/* loaded from: input_file:test-classes/org/apache/hadoop/metrics2/impl/TestSinkQueue.class */
public class TestSinkQueue {
    private static final Log LOG = LogFactory.getLog(TestSinkQueue.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-common-2.7.3-tests.jar:org/apache/hadoop/metrics2/impl/TestSinkQueue$Fun.class
      input_file:hadoop-common-2.7.3/share/hadoop/common/hadoop-common-2.7.3-tests.jar:org/apache/hadoop/metrics2/impl/TestSinkQueue$Fun.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/metrics2/impl/TestSinkQueue$Fun.class */
    public interface Fun {
        void run() throws Exception;
    }

    @Test
    public void testCommon() throws Exception {
        SinkQueue sinkQueue = new SinkQueue(2);
        sinkQueue.enqueue(1);
        Assert.assertEquals("queue front", 1L, ((Integer) sinkQueue.front()).intValue());
        Assert.assertEquals("queue back", 1L, ((Integer) sinkQueue.back()).intValue());
        Assert.assertEquals("element", 1L, ((Integer) sinkQueue.dequeue()).intValue());
        Assert.assertTrue("should enqueue", sinkQueue.enqueue(2));
        sinkQueue.consume(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.1
            @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
            public void consume(Integer num) {
                Assert.assertEquals("element", 2L, num.intValue());
            }
        });
        Assert.assertTrue("should enqueue", sinkQueue.enqueue(3));
        Assert.assertEquals("element", 3L, ((Integer) sinkQueue.dequeue()).intValue());
        Assert.assertEquals("queue size", 0L, sinkQueue.size());
        Assert.assertEquals("queue front", (Object) null, sinkQueue.front());
        Assert.assertEquals("queue back", (Object) null, sinkQueue.back());
    }

    @Test
    public void testEmptyBlocking() throws Exception {
        testEmptyBlocking(0);
        testEmptyBlocking(100);
    }

    private void testEmptyBlocking(int i) throws Exception {
        final SinkQueue sinkQueue = new SinkQueue(2);
        final Runnable runnable = (Runnable) Mockito.mock(Runnable.class);
        Thread thread = new Thread() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Assert.assertEquals("element", 1L, ((Integer) sinkQueue.dequeue()).intValue());
                    sinkQueue.consume(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.2.1
                        @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
                        public void consume(Integer num) {
                            Assert.assertEquals("element", 2L, num.intValue());
                            runnable.run();
                        }
                    });
                } catch (InterruptedException e) {
                    TestSinkQueue.LOG.warn("Interrupted", e);
                }
            }
        };
        thread.start();
        if (i > 0) {
            Thread.sleep(i);
        }
        sinkQueue.enqueue(1);
        sinkQueue.enqueue(2);
        thread.join();
        ((Runnable) Mockito.verify(runnable)).run();
    }

    @Test
    public void testFull() throws Exception {
        SinkQueue sinkQueue = new SinkQueue(1);
        sinkQueue.enqueue(1);
        Assert.assertTrue("should drop", !sinkQueue.enqueue(2));
        Assert.assertEquals("element", 1L, ((Integer) sinkQueue.dequeue()).intValue());
        sinkQueue.enqueue(3);
        sinkQueue.consume(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.3
            @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
            public void consume(Integer num) {
                Assert.assertEquals("element", 3L, num.intValue());
            }
        });
        Assert.assertEquals("queue size", 0L, sinkQueue.size());
    }

    @Test
    public void testConsumeAll() throws Exception {
        SinkQueue sinkQueue = new SinkQueue(64);
        for (int i = 0; i < 64; i++) {
            Assert.assertTrue("should enqueue", sinkQueue.enqueue(Integer.valueOf(i)));
        }
        Assert.assertTrue("should not enqueue", !sinkQueue.enqueue(64));
        final Runnable runnable = (Runnable) Mockito.mock(Runnable.class);
        sinkQueue.consumeAll(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.4
            private int expected = 0;

            @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
            public void consume(Integer num) {
                int i2 = this.expected;
                this.expected = i2 + 1;
                Assert.assertEquals("element", i2, num.intValue());
                runnable.run();
            }
        });
        ((Runnable) Mockito.verify(runnable, Mockito.times(64))).run();
    }

    @Test
    public void testConsumerException() throws Exception {
        SinkQueue sinkQueue = new SinkQueue(1);
        final RuntimeException runtimeException = new RuntimeException("expected");
        sinkQueue.enqueue(1);
        try {
            sinkQueue.consume(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.5
                @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
                public void consume(Integer num) {
                    throw runtimeException;
                }
            });
        } catch (Exception e) {
            Assert.assertSame("consumer exception", runtimeException, e);
        }
        Assert.assertEquals("queue size", 1L, sinkQueue.size());
        Assert.assertEquals("element", 1L, ((Integer) sinkQueue.dequeue()).intValue());
    }

    @Test
    public void testClear() {
        SinkQueue sinkQueue = new SinkQueue(128);
        for (int i = 0; i < sinkQueue.capacity() + 97; i++) {
            sinkQueue.enqueue(Integer.valueOf(i));
        }
        Assert.assertEquals("queue size", sinkQueue.capacity(), sinkQueue.size());
        sinkQueue.clear();
        Assert.assertEquals("queue size", 0L, sinkQueue.size());
    }

    @Test
    public void testHangingConsumer() throws Exception {
        SinkQueue<Integer> newSleepingConsumerQueue = newSleepingConsumerQueue(2, 1, 2);
        Assert.assertEquals("queue back", 2L, newSleepingConsumerQueue.back().intValue());
        Assert.assertTrue("should drop", !newSleepingConsumerQueue.enqueue(3));
        Assert.assertEquals("queue size", 2L, newSleepingConsumerQueue.size());
        Assert.assertEquals("queue head", 1L, newSleepingConsumerQueue.front().intValue());
        Assert.assertEquals("queue back", 2L, newSleepingConsumerQueue.back().intValue());
    }

    @Test
    public void testConcurrentConsumers() throws Exception {
        final SinkQueue<Integer> newSleepingConsumerQueue = newSleepingConsumerQueue(2, 1);
        Assert.assertTrue("should enqueue", newSleepingConsumerQueue.enqueue(2));
        Assert.assertEquals("queue back", 2L, newSleepingConsumerQueue.back().intValue());
        Assert.assertTrue("should drop", !newSleepingConsumerQueue.enqueue(3));
        shouldThrowCME(new Fun() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.6
            @Override // org.apache.hadoop.metrics2.impl.TestSinkQueue.Fun
            public void run() {
                newSleepingConsumerQueue.clear();
            }
        });
        shouldThrowCME(new Fun() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.7
            @Override // org.apache.hadoop.metrics2.impl.TestSinkQueue.Fun
            public void run() throws Exception {
                newSleepingConsumerQueue.consume(null);
            }
        });
        shouldThrowCME(new Fun() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.8
            @Override // org.apache.hadoop.metrics2.impl.TestSinkQueue.Fun
            public void run() throws Exception {
                newSleepingConsumerQueue.consumeAll(null);
            }
        });
        shouldThrowCME(new Fun() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.9
            @Override // org.apache.hadoop.metrics2.impl.TestSinkQueue.Fun
            public void run() throws Exception {
                newSleepingConsumerQueue.dequeue();
            }
        });
        Assert.assertEquals("queue size", 2L, newSleepingConsumerQueue.size());
        Assert.assertEquals("queue front", 1L, newSleepingConsumerQueue.front().intValue());
        Assert.assertEquals("queue back", 2L, newSleepingConsumerQueue.back().intValue());
    }

    private void shouldThrowCME(Fun fun) throws Exception {
        try {
            fun.run();
            LOG.error("should've thrown CME");
            Assert.fail("should've thrown CME");
        } catch (ConcurrentModificationException e) {
            LOG.info(e);
        }
    }

    private SinkQueue<Integer> newSleepingConsumerQueue(int i, int... iArr) throws Exception {
        final SinkQueue<Integer> sinkQueue = new SinkQueue<>(i);
        for (int i2 : iArr) {
            sinkQueue.enqueue(Integer.valueOf(i2));
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.10
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(10L);
                    sinkQueue.consume(new SinkQueue.Consumer<Integer>() { // from class: org.apache.hadoop.metrics2.impl.TestSinkQueue.10.1
                        @Override // org.apache.hadoop.metrics2.impl.SinkQueue.Consumer
                        public void consume(Integer num) throws InterruptedException {
                            TestSinkQueue.LOG.info("sleeping");
                            countDownLatch.countDown();
                            Thread.sleep(DateUtils.MILLIS_PER_DAY);
                        }
                    });
                } catch (InterruptedException e) {
                    TestSinkQueue.LOG.warn("Interrupted", e);
                }
            }
        };
        thread.setName("Sleeping consumer");
        thread.setDaemon(true);
        thread.start();
        countDownLatch.await();
        LOG.debug("Returning new sleeping consumer queue");
        return sinkQueue;
    }
}
