package org.apache.activemq;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import javax.jms.Destination;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/LargeStreamletTest.class */
public final class LargeStreamletTest extends TestCase {
    private static final Log log;
    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
    private static final int BUFFER_SIZE = 1024;
    private static final int MESSAGE_COUNT = 1048576;
    private AtomicInteger totalRead = new AtomicInteger();
    private AtomicInteger totalWritten = new AtomicInteger();
    private AtomicBoolean stopThreads = new AtomicBoolean(false);
    protected Exception writerException;
    protected Exception readerException;
    static Class class$org$apache$activemq$LargeStreamletTest;

    public void testStreamlets() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
        activeMQConnection.start();
        ActiveMQConnection activeMQConnection2 = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
        activeMQConnection2.start();
        try {
            ActiveMQQueue activeMQQueue = new ActiveMQQueue("streamtest");
            Thread thread = new Thread(new Runnable(this, activeMQConnection, activeMQQueue) { // from class: org.apache.activemq.LargeStreamletTest.1
                private final ActiveMQConnection val$connection;
                private final Destination val$destination;
                private final LargeStreamletTest this$0;

                {
                    this.this$0 = this;
                    this.val$connection = activeMQConnection;
                    this.val$destination = activeMQQueue;
                }

                /* JADX WARN: Finally extract failed */
                @Override // java.lang.Runnable
                public void run() {
                    int read;
                    this.this$0.totalRead.set(0);
                    try {
                        try {
                            InputStream createInputStream = this.val$connection.createInputStream(this.val$destination);
                            try {
                                byte[] bArr = new byte[1024];
                                while (!this.this$0.stopThreads.get() && (read = createInputStream.read(bArr)) != -1) {
                                    this.this$0.totalRead.addAndGet(read);
                                }
                                createInputStream.close();
                                LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalRead).append(" total bytes read.").toString());
                            } catch (Throwable th) {
                                createInputStream.close();
                                throw th;
                            }
                        } catch (Exception e) {
                            this.this$0.readerException = e;
                            e.printStackTrace();
                            LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalRead).append(" total bytes read.").toString());
                        }
                    } catch (Throwable th2) {
                        LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalRead).append(" total bytes read.").toString());
                        throw th2;
                    }
                }
            });
            Thread thread2 = new Thread(new Runnable(this, activeMQConnection2, activeMQQueue) { // from class: org.apache.activemq.LargeStreamletTest.2
                private final ActiveMQConnection val$connection2;
                private final Destination val$destination;
                private final LargeStreamletTest this$0;

                {
                    this.this$0 = this;
                    this.val$connection2 = activeMQConnection2;
                    this.val$destination = activeMQQueue;
                }

                /* JADX WARN: Finally extract failed */
                @Override // java.lang.Runnable
                public void run() {
                    this.this$0.totalWritten.set(0);
                    try {
                        try {
                            OutputStream createOutputStream = this.val$connection2.createOutputStream(this.val$destination);
                            try {
                                byte[] bArr = new byte[1024];
                                new Random().nextBytes(bArr);
                                for (int i = LargeStreamletTest.MESSAGE_COUNT; i > 0; i--) {
                                    if (this.this$0.stopThreads.get()) {
                                        break;
                                    }
                                    createOutputStream.write(bArr);
                                    this.this$0.totalWritten.addAndGet(bArr.length);
                                }
                                createOutputStream.close();
                                LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalWritten).append(" total bytes written.").toString());
                            } catch (Throwable th) {
                                createOutputStream.close();
                                throw th;
                            }
                        } catch (Exception e) {
                            this.this$0.writerException = e;
                            e.printStackTrace();
                            LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalWritten).append(" total bytes written.").toString());
                        }
                    } catch (Throwable th2) {
                        LargeStreamletTest.log.info(new StringBuffer().append(this.this$0.totalWritten).append(" total bytes written.").toString());
                        throw th2;
                    }
                }
            });
            thread.start();
            thread2.start();
            Thread.sleep(1000L);
            int i = this.totalRead.get();
            while (thread.isAlive()) {
                thread.join(1000L);
                if (i == this.totalRead.get()) {
                    break;
                } else {
                    i = this.totalRead.get();
                }
            }
            this.stopThreads.set(true);
            assertTrue("Should not have received a reader exception", this.readerException == null);
            assertTrue("Should not have received a writer exception", this.writerException == null);
            Assert.assertEquals("Not all messages accounted for", this.totalWritten.get(), this.totalRead.get());
            activeMQConnection.close();
            activeMQConnection2.close();
        } catch (Throwable th) {
            activeMQConnection.close();
            activeMQConnection2.close();
            throw th;
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$LargeStreamletTest == null) {
            cls = class$("org.apache.activemq.LargeStreamletTest");
            class$org$apache$activemq$LargeStreamletTest = cls;
        } else {
            cls = class$org$apache$activemq$LargeStreamletTest;
        }
        log = LogFactory.getLog(cls);
    }
}
