package org.apache.activemq.bugs;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/bugs/QueueWorkerPrefetchTest.class */
public class QueueWorkerPrefetchTest extends TestCase implements MessageListener {
    private static final Log LOG = LogFactory.getLog(QueueWorkerPrefetchTest.class);
    private static final int BATCH_SIZE = 10;
    private static final long WAIT_TIMEOUT = 10000;
    private static final String CONNECTION_URL = "tcp://localhost:61616";
    private static final int QUEUE_PREFETCH_SIZE = 1;
    private static final int NUM_WORKERS = 2;
    private BrokerService broker;
    private MessageProducer workItemProducer;
    private MessageConsumer masterItemConsumer;
    private AtomicLong acksReceived = new AtomicLong(0);
    private AtomicReference<CountDownLatch> latch = new AtomicReference<>();

    /* loaded from: input_file:org/apache/activemq/bugs/QueueWorkerPrefetchTest$WorkMessage.class */
    private static class WorkMessage implements Serializable {
        private final int id;

        public WorkMessage(int i) {
            this.id = i;
        }

        public String toString() {
            return "Work: " + this.id;
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/QueueWorkerPrefetchTest$Worker.class */
    private static class Worker implements MessageListener {
        private static AtomicInteger counter = new AtomicInteger(0);
        private Session session;
        private MessageProducer masterItemProducer;
        private MessageProducer workItemProducer;

        public Worker(Session session) throws JMSException {
            this.session = session;
            this.masterItemProducer = session.createProducer(session.createQueue("master-item"));
            Queue createQueue = session.createQueue("work-item");
            this.workItemProducer = session.createProducer(createQueue);
            session.createConsumer(createQueue).setMessageListener(this);
        }

        public void onMessage(Message message) {
            try {
                WorkMessage workMessage = (WorkMessage) ((ObjectMessage) message).getObject();
                if (counter.incrementAndGet() % 10 != 0) {
                    this.workItemProducer.send(this.session.createObjectMessage(new WorkMessage(workMessage.id + 1)));
                }
                this.masterItemProducer.send(this.session.createObjectMessage(workMessage));
            } catch (JMSException e) {
                throw new IllegalStateException("Something has gone wrong", e);
            }
        }

        public void close() throws JMSException {
            this.masterItemProducer.close();
            this.workItemProducer.close();
            this.session.close();
        }
    }

    public void onMessage(Message message) {
        long incrementAndGet = this.acksReceived.incrementAndGet();
        this.latch.get().countDown();
        if (incrementAndGet % 1 == 0) {
            LOG.info("Master now has ack count of: " + this.acksReceived);
        }
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.setUseJmx(true);
        this.broker.addConnector(CONNECTION_URL);
        this.broker.start();
    }

    protected void tearDown() throws Exception {
        this.broker.deleteAllMessages();
        this.broker.stop();
        super.tearDown();
    }

    public void testActiveMQ() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setQueuePrefetch(1);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        this.workItemProducer = createSession.createProducer(createSession.createQueue("work-item"));
        this.masterItemConsumer = createSession.createConsumer(createSession.createQueue("master-item"));
        this.masterItemConsumer.setMessageListener(this);
        Worker[] workerArr = new Worker[2];
        for (int i = 0; i < 2; i++) {
            workerArr[i] = new Worker(createConnection.createSession(false, 1));
        }
        this.acksReceived.set(0L);
        this.latch.set(new CountDownLatch(10));
        this.workItemProducer.send(createSession.createObjectMessage(new WorkMessage(1)));
        if (!this.latch.get().await(10000L, TimeUnit.MILLISECONDS)) {
            fail("First batch only received " + this.acksReceived + " messages");
        }
        LOG.info("First batch received");
        this.acksReceived.set(0L);
        this.latch.set(new CountDownLatch(10));
        this.workItemProducer.send(createSession.createObjectMessage(new WorkMessage(1)));
        if (!this.latch.get().await(10000L, TimeUnit.MILLISECONDS)) {
            fail("Second batch only received " + this.acksReceived + " messages");
        }
        LOG.info("Second batch received");
        for (int i2 = 0; i2 < 2; i2++) {
            workerArr[i2].close();
        }
        createSession.close();
        createConnection.close();
    }
}
