package org.apache.activemq;

import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.Executors;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.transport.http.BlockingQueueTransport;
import org.apache.derby.iapi.store.raw.RowLock;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/* loaded from: input_file:org/apache/activemq/AMQDeadlockTest3.class */
public class AMQDeadlockTest3 extends TestCase {
    private static final String URL1 = "tcp://localhost:61616";
    private static final String URL2 = "tcp://localhost:61617";
    private static final String QUEUE1_NAME = "test.queue.1";
    private static final String QUEUE2_NAME = "test.queue.2";
    private static final int MAX_CONSUMERS = 1;
    private static final int MAX_PRODUCERS = 1;
    private static final int NUM_MESSAGE_TO_SEND = 10;
    private AtomicInteger messageCount = new AtomicInteger();
    private CountDownLatch doneLatch;

    /* loaded from: input_file:org/apache/activemq/AMQDeadlockTest3$NonPooledProducerTask.class */
    private class NonPooledProducerTask implements Runnable {
        private final String queueName;
        private final ConnectionFactory cf;
        private final AMQDeadlockTest3 this$0;

        public NonPooledProducerTask(AMQDeadlockTest3 aMQDeadlockTest3, ConnectionFactory connectionFactory, String str) {
            this.this$0 = aMQDeadlockTest3;
            this.cf = connectionFactory;
            this.queueName = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                JmsTemplate jmsTemplate = new JmsTemplate(this.cf);
                jmsTemplate.setDeliveryMode(1);
                jmsTemplate.setExplicitQosEnabled(true);
                jmsTemplate.setMessageIdEnabled(false);
                jmsTemplate.setMessageTimestampEnabled(false);
                jmsTemplate.afterPropertiesSet();
                byte[] bArr = new byte[2048];
                new Random().nextBytes(bArr);
                Thread.sleep(2000L);
                AtomicInteger atomicInteger = new AtomicInteger();
                for (int i = 0; i < 10; i++) {
                    jmsTemplate.send(this.queueName, new MessageCreator(this, bArr, atomicInteger) { // from class: org.apache.activemq.AMQDeadlockTest3.NonPooledProducerTask.1
                        private final byte[] val$bytes;
                        private final AtomicInteger val$count;
                        private final NonPooledProducerTask this$1;

                        {
                            this.this$1 = this;
                            this.val$bytes = bArr;
                            this.val$count = atomicInteger;
                        }

                        public Message createMessage(Session session) throws JMSException {
                            BytesMessage createBytesMessage = session.createBytesMessage();
                            createBytesMessage.writeBytes(this.val$bytes);
                            createBytesMessage.setIntProperty(RowLock.DIAG_COUNT, this.val$count.incrementAndGet());
                            createBytesMessage.setStringProperty("producer", "non-pooled");
                            return createBytesMessage;
                        }
                    });
                    System.out.println(new StringBuffer().append("Non-PooledProducer sent message: ").append(atomicInteger.get()).toString());
                }
            } catch (Throwable th) {
                System.err.println("Producer 1 is exiting.");
                th.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/AMQDeadlockTest3$PooledProducerTask.class */
    private class PooledProducerTask implements Runnable {
        private final String queueName;
        private final PooledConnectionFactory pcf;
        private final AMQDeadlockTest3 this$0;

        public PooledProducerTask(AMQDeadlockTest3 aMQDeadlockTest3, PooledConnectionFactory pooledConnectionFactory, String str) {
            this.this$0 = aMQDeadlockTest3;
            this.pcf = pooledConnectionFactory;
            this.queueName = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                JmsTemplate jmsTemplate = new JmsTemplate(this.pcf);
                jmsTemplate.setDeliveryMode(1);
                jmsTemplate.setExplicitQosEnabled(true);
                jmsTemplate.setMessageIdEnabled(false);
                jmsTemplate.setMessageTimestampEnabled(false);
                jmsTemplate.afterPropertiesSet();
                byte[] bArr = new byte[2048];
                new Random().nextBytes(bArr);
                Thread.sleep(2000L);
                AtomicInteger atomicInteger = new AtomicInteger();
                for (int i = 0; i < 10; i++) {
                    jmsTemplate.send(this.queueName, new MessageCreator(this, bArr, atomicInteger) { // from class: org.apache.activemq.AMQDeadlockTest3.PooledProducerTask.1
                        private final byte[] val$bytes;
                        private final AtomicInteger val$count;
                        private final PooledProducerTask this$1;

                        {
                            this.this$1 = this;
                            this.val$bytes = bArr;
                            this.val$count = atomicInteger;
                        }

                        public Message createMessage(Session session) throws JMSException {
                            BytesMessage createBytesMessage = session.createBytesMessage();
                            createBytesMessage.writeBytes(this.val$bytes);
                            createBytesMessage.setIntProperty(RowLock.DIAG_COUNT, this.val$count.incrementAndGet());
                            createBytesMessage.setStringProperty("producer", "pooled");
                            return createBytesMessage;
                        }
                    });
                    System.out.println(new StringBuffer().append("PooledProducer sent message: ").append(atomicInteger.get()).toString());
                }
            } catch (Throwable th) {
                System.err.println("Producer 1 is exiting.");
                th.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/AMQDeadlockTest3$TestMessageListener1.class */
    private class TestMessageListener1 implements MessageListener {
        private final long waitTime;
        private final AMQDeadlockTest3 this$0;

        public TestMessageListener1(AMQDeadlockTest3 aMQDeadlockTest3, long j) {
            this.this$0 = aMQDeadlockTest3;
            this.waitTime = j;
        }

        @Override // javax.jms.MessageListener
        public void onMessage(Message message) {
            try {
                System.out.println(new StringBuffer().append("Listener1 Consumed message ").append(message.getIntProperty(RowLock.DIAG_COUNT)).toString());
                this.this$0.messageCount.incrementAndGet();
                this.this$0.doneLatch.countDown();
                Thread.sleep(this.waitTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (JMSException e2) {
                e2.printStackTrace();
            }
        }
    }

    public void setUp() throws Exception {
    }

    public void tearDown() throws Exception {
    }

    public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
        BrokerService brokerService = null;
        DefaultMessageListenerContainer defaultMessageListenerContainer = null;
        try {
            brokerService = createBrokerService("broker1", "tcp://localhost:61616", null);
            brokerService.start();
            ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory("tcp://localhost:61616");
            PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(createConnectionFactory);
            this.doneLatch = new CountDownLatch(10);
            defaultMessageListenerContainer = createDefaultMessageListenerContainer(createConnectionFactory, new TestMessageListener1(this, 500L), QUEUE1_NAME);
            defaultMessageListenerContainer.afterPropertiesSet();
            Thread.sleep(2000L);
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; i++) {
                newCachedThreadPool.submit(new PooledProducerTask(this, pooledConnectionFactory, QUEUE2_NAME));
                Thread.sleep(1000L);
                newCachedThreadPool.submit(new PooledProducerTask(this, pooledConnectionFactory, QUEUE1_NAME));
            }
            assertTrue(this.doneLatch.await(20L, TimeUnit.SECONDS));
            newCachedThreadPool.shutdownNow();
            Assert.assertEquals(10, this.messageCount.get());
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            brokerService.stop();
        } catch (Throwable th) {
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            brokerService.stop();
            throw th;
        }
    }

    public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing() throws Exception {
        BrokerService brokerService = null;
        BrokerService brokerService2 = null;
        DefaultMessageListenerContainer defaultMessageListenerContainer = null;
        try {
            brokerService = createBrokerService("broker1", "tcp://localhost:61616", URL2);
            brokerService.start();
            brokerService2 = createBrokerService("broker2", URL2, "tcp://localhost:61616");
            brokerService2.start();
            ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory("tcp://localhost:61616");
            ActiveMQConnectionFactory createConnectionFactory2 = createConnectionFactory(URL2);
            PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(createConnectionFactory);
            Thread.sleep(1000L);
            this.doneLatch = new CountDownLatch(10);
            defaultMessageListenerContainer = createDefaultMessageListenerContainer(createConnectionFactory2, new TestMessageListener1(this, 500L), QUEUE1_NAME);
            defaultMessageListenerContainer.afterPropertiesSet();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; i++) {
                newCachedThreadPool.submit(new PooledProducerTask(this, pooledConnectionFactory, QUEUE2_NAME));
                Thread.sleep(1000L);
                newCachedThreadPool.submit(new PooledProducerTask(this, pooledConnectionFactory, QUEUE1_NAME));
            }
            assertTrue(this.doneLatch.await(20L, TimeUnit.SECONDS));
            newCachedThreadPool.shutdownNow();
            Assert.assertEquals(10, this.messageCount.get());
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            brokerService.stop();
            brokerService2.stop();
        } catch (Throwable th) {
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            brokerService.stop();
            brokerService2.stop();
            throw th;
        }
    }

    public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing() throws Exception {
        BrokerService brokerService = null;
        BrokerService brokerService2 = null;
        DefaultMessageListenerContainer defaultMessageListenerContainer = null;
        DefaultMessageListenerContainer defaultMessageListenerContainer2 = null;
        try {
            brokerService = createBrokerService("broker1", "tcp://localhost:61616", URL2);
            brokerService.start();
            brokerService2 = createBrokerService("broker2", URL2, "tcp://localhost:61616");
            brokerService2.start();
            ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory("tcp://localhost:61616");
            ActiveMQConnectionFactory createConnectionFactory2 = createConnectionFactory(URL2);
            Thread.sleep(1000L);
            this.doneLatch = new CountDownLatch(10);
            defaultMessageListenerContainer = createDefaultMessageListenerContainer(createConnectionFactory2, new TestMessageListener1(this, 500L), QUEUE1_NAME);
            defaultMessageListenerContainer.afterPropertiesSet();
            defaultMessageListenerContainer2 = createDefaultMessageListenerContainer(createConnectionFactory2, new TestMessageListener1(this, BlockingQueueTransport.MAX_TIMEOUT), QUEUE2_NAME);
            defaultMessageListenerContainer2.afterPropertiesSet();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; i++) {
                newCachedThreadPool.submit(new NonPooledProducerTask(this, createConnectionFactory, QUEUE2_NAME));
                Thread.sleep(1000L);
                newCachedThreadPool.submit(new NonPooledProducerTask(this, createConnectionFactory, QUEUE1_NAME));
            }
            assertTrue(this.doneLatch.await(20L, TimeUnit.SECONDS));
            newCachedThreadPool.shutdownNow();
            Assert.assertEquals(10, this.messageCount.get());
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            defaultMessageListenerContainer2.stop();
            defaultMessageListenerContainer2.destroy();
            brokerService.stop();
            brokerService2.stop();
        } catch (Throwable th) {
            defaultMessageListenerContainer.stop();
            defaultMessageListenerContainer.destroy();
            defaultMessageListenerContainer2.stop();
            defaultMessageListenerContainer2.destroy();
            brokerService.stop();
            brokerService2.stop();
            throw th;
        }
    }

    private BrokerService createBrokerService(String str, String str2, String str3) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(str);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);
        UsageManager usageManager = new UsageManager();
        usageManager.setLimit(5000000L);
        brokerService.setMemoryManager(usageManager);
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(DestinationFilter.ANY_DESCENDENT);
        policyEntry.setMemoryLimit(1000L);
        arrayList.add(policyEntry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI(str2));
        transportConnector.setBrokerName(str);
        transportConnector.setName(new StringBuffer().append(str).append(".transportConnector").toString());
        brokerService.addConnector(transportConnector);
        if (str3 != null) {
            DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI(new StringBuffer().append("static:").append(str3).toString()));
            discoveryNetworkConnector.setBridgeTempDestinations(true);
            discoveryNetworkConnector.setBrokerName(str);
            discoveryNetworkConnector.setName(new StringBuffer().append(str).append(".nc").toString());
            brokerService.addNetworkConnector(discoveryNetworkConnector);
        }
        return brokerService;
    }

    public DefaultMessageListenerContainer createDefaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener, String str) {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
        defaultMessageListenerContainer.setDestinationName(str);
        defaultMessageListenerContainer.setMessageListener(messageListener);
        defaultMessageListenerContainer.setSessionTransacted(false);
        defaultMessageListenerContainer.setSessionAcknowledgeMode(1);
        defaultMessageListenerContainer.setConcurrentConsumers(1);
        return defaultMessageListenerContainer;
    }

    public ActiveMQConnectionFactory createConnectionFactory(String str) {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(str);
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseAsyncSend(false);
        activeMQConnectionFactory.setDispatchAsync(true);
        activeMQConnectionFactory.setUseCompression(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.setOptimizedMessageDispatch(true);
        activeMQConnectionFactory.setUseSyncSend(true);
        return activeMQConnectionFactory;
    }
}
