package org.apache.activemq.broker.region.cursors;

import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/CursorDurableTest.class */
public class CursorDurableTest extends TestCase {
    protected static final Log log;
    protected static final int MESSAGE_COUNT = 100;
    protected static final int PREFETCH_SIZE = 5;
    protected BrokerService broker;
    protected String bindAddress = "tcp://localhost:60706";
    protected int topicCount = 0;
    static Class class$org$apache$activemq$broker$region$cursors$CursorDurableTest;

    public void testSendFirstThenConsume() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        Connection consumerConnection = getConsumerConnection(createConnectionFactory);
        getConsumer(consumerConnection);
        consumerConnection.close();
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(getTopic(createSession));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(new StringBuffer().append("test").append(i).toString());
            arrayList.add(createTextMessage);
            createProducer.send(createTextMessage);
        }
        createConnection.close();
        Connection consumerConnection2 = getConsumerConnection(createConnectionFactory);
        MessageConsumer consumer = getConsumer(consumerConnection2);
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList2.add(consumer.receive());
        }
        assertEquals(arrayList, arrayList2);
        consumerConnection2.close();
    }

    public void testSendWhilstConsume() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        Connection consumerConnection = getConsumerConnection(createConnectionFactory);
        getConsumer(consumerConnection);
        consumerConnection.close();
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(getTopic(createSession));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(new StringBuffer().append("test").append(i).toString());
            arrayList.add(createTextMessage);
            createProducer.send(createTextMessage);
        }
        Connection consumerConnection2 = getConsumerConnection(createConnectionFactory);
        MessageConsumer consumer = getConsumer(consumerConnection2);
        ArrayList arrayList2 = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        consumer.setMessageListener(new MessageListener(this, arrayList2, countDownLatch) { // from class: org.apache.activemq.broker.region.cursors.CursorDurableTest.1
            private final List val$consumerList;
            private final CountDownLatch val$latch;
            private final CursorDurableTest this$0;

            {
                this.this$0 = this;
                this.val$consumerList = arrayList2;
                this.val$latch = countDownLatch;
            }

            @Override // javax.jms.MessageListener
            public void onMessage(Message message) {
                try {
                    Thread.sleep(50L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                this.val$consumerList.add(message);
                if (this.val$consumerList.size() == 100) {
                    this.val$latch.countDown();
                }
            }
        });
        for (int i2 = 10; i2 < 100; i2++) {
            TextMessage createTextMessage2 = createSession.createTextMessage(new StringBuffer().append("test").append(i2).toString());
            arrayList.add(createTextMessage2);
            createProducer.send(createTextMessage2);
        }
        countDownLatch.await(300000L, TimeUnit.MILLISECONDS);
        assertEquals("Still dipatching - count down latch not sprung", countDownLatch.getCount(), 0L);
        assertEquals(new StringBuffer().append("cosumerList - expected: 100 but was: ").append(arrayList2.size()).toString(), arrayList2.size(), arrayList.size());
        assertEquals(arrayList, arrayList2);
        createConnection.close();
        consumerConnection2.close();
    }

    protected Topic getTopic(Session session) throws JMSException {
        return session.createTopic(getClass().getName());
    }

    protected Connection getConsumerConnection(ConnectionFactory connectionFactory) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        createConnection.setClientID("testConsumer");
        createConnection.start();
        return createConnection;
    }

    protected MessageConsumer getConsumer(Connection connection) throws Exception {
        Session createSession = connection.createSession(false, 1);
        return createSession.createDurableSubscriber(getTopic(createSession), "testConsumer");
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker();
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
        Properties properties = new Properties();
        properties.setProperty("prefetchPolicy.durableTopicPrefetch", "5");
        properties.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
        activeMQConnectionFactory.setProperties(properties);
        return activeMQConnectionFactory;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        brokerService.addConnector(this.bindAddress);
        brokerService.setDeleteAllMessagesOnStartup(true);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$broker$region$cursors$CursorDurableTest == null) {
            cls = class$("org.apache.activemq.broker.region.cursors.CursorDurableTest");
            class$org$apache$activemq$broker$region$cursors$CursorDurableTest = cls;
        } else {
            cls = class$org$apache$activemq$broker$region$cursors$CursorDurableTest;
        }
        log = LogFactory.getLog(cls);
    }
}
