package org.apache.hadoop.tools.util;

import java.util.concurrent.TimeoutException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:test-classes/org/apache/hadoop/tools/util/TestProducerConsumer.class
 */
/* loaded from: input_file:hadoop-distcp-2.10.2-tests.jar:org/apache/hadoop/tools/util/TestProducerConsumer.class */
public class TestProducerConsumer {

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/tools/util/TestProducerConsumer$CopyProcessor.class
     */
    /* loaded from: input_file:hadoop-distcp-2.10.2-tests.jar:org/apache/hadoop/tools/util/TestProducerConsumer$CopyProcessor.class */
    public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> {
        public CopyProcessor() {
        }

        @Override // org.apache.hadoop.tools.util.WorkRequestProcessor
        public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
            return new WorkReport<>(new Integer(workRequest.getItem().intValue()), 0, true);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/tools/util/TestProducerConsumer$ExceptionProcessor.class
     */
    /* loaded from: input_file:hadoop-distcp-2.10.2-tests.jar:org/apache/hadoop/tools/util/TestProducerConsumer$ExceptionProcessor.class */
    public class ExceptionProcessor implements WorkRequestProcessor<Integer, Integer> {
        public ExceptionProcessor() {
        }

        @Override // org.apache.hadoop.tools.util.WorkRequestProcessor
        public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
            try {
                Integer num = null;
                num.intValue();
                return new WorkReport<>(null, 0, true);
            } catch (Exception e) {
                return new WorkReport<>(new Integer(workRequest.getItem().intValue()), 1, false, e);
            }
        }
    }

    @Test
    public void testSimpleProducerConsumer() {
        ProducerConsumer producerConsumer = new ProducerConsumer(1);
        producerConsumer.addWorker(new CopyProcessor());
        producerConsumer.put(new WorkRequest(42));
        try {
            Assert.assertEquals(42L, ((Integer) producerConsumer.take().getItem()).intValue());
        } catch (InterruptedException e) {
            Assert.assertTrue(false);
        }
        producerConsumer.shutdown();
    }

    @Test
    public void testMultipleProducerConsumer() {
        ProducerConsumer producerConsumer = new ProducerConsumer(10);
        for (int i = 0; i < 10; i++) {
            producerConsumer.addWorker(new CopyProcessor());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 2000; i3++) {
            producerConsumer.put(new WorkRequest(Integer.valueOf(i3 + 42)));
            i2 += i3 + 42;
        }
        int i4 = 0;
        while (producerConsumer.getWorkCnt() > 0) {
            i2 -= ((Integer) producerConsumer.blockingTake().getItem()).intValue();
            i4++;
        }
        Assert.assertEquals(0L, i2);
        Assert.assertEquals(2000, i4);
        producerConsumer.shutdown();
    }

    @Test
    public void testExceptionProducerConsumer() {
        ProducerConsumer producerConsumer = new ProducerConsumer(1);
        producerConsumer.addWorker(new ExceptionProcessor());
        producerConsumer.put(new WorkRequest(42));
        try {
            WorkReport take = producerConsumer.take();
            Assert.assertEquals(42L, ((Integer) take.getItem()).intValue());
            Assert.assertFalse(take.getSuccess());
            Assert.assertNotNull(take.getException());
        } catch (InterruptedException e) {
            Assert.assertTrue(false);
        }
        producerConsumer.shutdown();
    }

    @Test
    public void testSimpleProducerConsumerShutdown() throws InterruptedException, TimeoutException {
        ProducerConsumer producerConsumer = new ProducerConsumer(1);
        producerConsumer.addWorker(new CopyProcessor());
        producerConsumer.shutdown();
        GenericTestUtils.waitForThreadTermination("pool-.*-thread.*", 100, 10000);
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.hadoop.tools.util.TestProducerConsumer$1SourceThread] */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.hadoop.tools.util.TestProducerConsumer$1SinkThread] */
    @Test(timeout = 10000)
    public void testMultipleProducerConsumerShutdown() throws InterruptedException, TimeoutException {
        final ProducerConsumer producerConsumer = new ProducerConsumer(10);
        for (int i = 0; i < 10; i++) {
            producerConsumer.addWorker(new CopyProcessor());
        }
        ?? r0 = new Thread() { // from class: org.apache.hadoop.tools.util.TestProducerConsumer.1SourceThread
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        producerConsumer.put(new WorkRequest(42));
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        };
        r0.start();
        ?? r02 = new Thread() { // from class: org.apache.hadoop.tools.util.TestProducerConsumer.1SinkThread
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        Assert.assertEquals(42L, ((Integer) producerConsumer.take().getItem()).intValue());
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        };
        r02.start();
        Thread.sleep(1000L);
        r0.interrupt();
        while (producerConsumer.hasWork()) {
            Thread.sleep(1L);
        }
        producerConsumer.shutdown();
        GenericTestUtils.waitForThreadTermination("pool-.*-thread.*", 100, 10000);
        r02.interrupt();
        r0.join();
        r02.join();
    }
}
