package org.apache.activemq.store.jdbc;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.class */
public class JDBCMessagePriorityTest extends MessagePriorityTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessagePriorityTest.class);
    EmbeddedDataSource dataSource;

    @Override // org.apache.activemq.store.MessagePriorityTest
    protected PersistenceAdapter createPersistenceAdapter(boolean z) throws Exception {
        JDBCPersistenceAdapter jDBCPersistenceAdapter = new JDBCPersistenceAdapter();
        this.dataSource = new EmbeddedDataSource();
        this.dataSource.setDatabaseName("derbyDb");
        this.dataSource.setCreateDatabase("create");
        this.dataSource.setShutdownDatabase((String) null);
        jDBCPersistenceAdapter.setDataSource(this.dataSource);
        jDBCPersistenceAdapter.deleteAllMessages();
        jDBCPersistenceAdapter.setCleanupPeriod(2000);
        return jDBCPersistenceAdapter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.store.MessagePriorityTest, org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
        try {
            if (this.dataSource != null) {
                this.dataSource.setShutdownDatabase("shutdown");
                this.dataSource.getConnection();
            }
            this.dataSource.setShutdownDatabase((String) null);
        } catch (Exception e) {
            this.dataSource.setShutdownDatabase((String) null);
        } catch (Throwable th) {
            this.dataSource.setShutdownDatabase((String) null);
            throw th;
        }
    }

    public void testDurableSubsReconnectWithFourLevels() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        int i = this.LOW_PRI + 1;
        int i2 = this.HIGH_PRI - 1;
        MessagePriorityTest.ProducerThread producerThread = new MessagePriorityTest.ProducerThread(createTopic, this.MSG_NUM, this.LOW_PRI);
        MessagePriorityTest.ProducerThread producerThread2 = new MessagePriorityTest.ProducerThread(createTopic, this.MSG_NUM, i);
        MessagePriorityTest.ProducerThread producerThread3 = new MessagePriorityTest.ProducerThread(createTopic, this.MSG_NUM, i2);
        MessagePriorityTest.ProducerThread producerThread4 = new MessagePriorityTest.ProducerThread(createTopic, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread4.start();
        producerThread2.start();
        producerThread3.start();
        producerThread.join();
        producerThread4.join();
        producerThread2.join();
        producerThread3.join();
        int i3 = this.MSG_NUM;
        int[] iArr = {this.HIGH_PRI, i2, i, this.LOW_PRI};
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        for (int i4 = 0; i4 < this.MSG_NUM * 4; i4++) {
            Message receive = createDurableSubscriber.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
            LOG.debug("received i=" + i4 + ", m=" + (receive != null ? receive.getJMSMessageID() + ", priority: " + receive.getJMSPriority() : null));
            assertNotNull("Message " + i4 + " was null", receive);
            assertEquals("Message " + i4 + " has wrong priority", iArr[i4 / this.MSG_NUM], receive.getJMSPriority());
            if (i4 > 0 && i4 % i3 == 0) {
                LOG.info("Closing durable sub.. on: " + i4);
                createDurableSubscriber.close();
                createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
            }
        }
        LOG.info("closing on done!");
        createDurableSubscriber.close();
    }

    public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
        addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        Connection createConnection = this.factory.createConnection();
        createConnection.setClientID("priorityDisconnect");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        AtomicInteger[] atomicIntegerArr = new AtomicInteger[5];
        Vector vector = new Vector();
        for (int i = 0; i < 5; i++) {
            vector.add(new MessagePriorityTest.ProducerThread(createTopic, this.MSG_NUM, i));
            atomicIntegerArr[i] = new AtomicInteger(0);
        }
        Iterator it = vector.iterator();
        while (it.hasNext()) {
            ((MessagePriorityTest.ProducerThread) it.next()).start();
        }
        int i2 = this.MSG_NUM / 2;
        HashMap hashMap = new HashMap();
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "priorityDisconnect");
        for (int i3 = 0; i3 < this.MSG_NUM * 5; i3++) {
            Message receive = createDurableSubscriber.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
            LOG.debug("received i=" + i3 + ", m=" + (receive != null ? receive.getJMSMessageID() + ", priority: " + receive.getJMSPriority() : null));
            assertNull("no duplicate message failed on : " + receive.getJMSMessageID(), hashMap.put(receive.getJMSMessageID(), "priorityDisconnect"));
            assertNotNull("Message " + i3 + " was null", receive);
            atomicIntegerArr[receive.getJMSPriority()].incrementAndGet();
            if (i3 > 0 && i3 % i2 == 0) {
                LOG.info("Closing durable sub.. on: " + i3 + ", counts: " + Arrays.toString(atomicIntegerArr));
                createDurableSubscriber.close();
                createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "priorityDisconnect");
            }
        }
        LOG.info("closing on done!");
        createDurableSubscriber.close();
        createSession.close();
        createConnection.close();
        Iterator it2 = vector.iterator();
        while (it2.hasNext()) {
            ((MessagePriorityTest.ProducerThread) it2.next()).join();
        }
    }

    public void initCombosForTestConcurrentRate() {
        addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)});
    }

    public void testConcurrentRate() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        Connection createConnection = this.factory.createConnection();
        createConnection.setClientID("subName");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(createTopic, "priorityConcurrent").close();
        final Vector vector = new Vector();
        final int[] iArr = new int[8000];
        double d = 0.0d;
        double d2 = 0.0d;
        MessageProducer createProducer = this.sess.createProducer(createTopic);
        TextMessage createTextMessage = this.sess.createTextMessage();
        for (int i = 0; i < 2000; i++) {
            int i2 = i % 10;
            createTextMessage.setText(i + "-" + i2);
            createTextMessage.setIntProperty("seq", i);
            createTextMessage.setJMSPriority(i2);
            if (i > 0 && i % 1000 == 0) {
                LOG.info("Max send time: " + d + ". Sending message: " + createTextMessage.getText());
            }
            long currentTimeMillis = System.currentTimeMillis();
            createProducer.send(createTextMessage, 2, createTextMessage.getJMSPriority(), 0L);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            d = Math.max(d, currentTimeMillis2);
            if (currentTimeMillis2 == d) {
                LOG.info("new max: " + d + " on i=" + i + ", " + createTextMessage.getText());
            }
            d2 += currentTimeMillis2;
        }
        LOG.info("Sent: 2000, max send time: " + d);
        double d3 = (d2 * 100.0d) / 2000.0d;
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "priorityConcurrent");
        final AtomicInteger atomicInteger = new AtomicInteger();
        createDurableSubscriber.setMessageListener(new MessageListener() { // from class: org.apache.activemq.store.jdbc.JDBCMessagePriorityTest.1
            public void onMessage(Message message) {
                try {
                    atomicInteger.incrementAndGet();
                    if (atomicInteger.get() % 100 == 0) {
                        JDBCMessagePriorityTest.LOG.info("onMessage: count: " + atomicInteger.get() + ", " + ((TextMessage) message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID());
                    }
                    int intProperty = message.getIntProperty("seq");
                    if (iArr[intProperty] == 0) {
                        iArr[intProperty] = 1;
                    } else {
                        JDBCMessagePriorityTest.LOG.error("Duplicate: " + ((TextMessage) message).getText() + ", " + message.getJMSMessageID());
                        vector.add(message);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        LOG.info("Activated consumer");
        double d4 = 0.0d;
        double d5 = 0.0d;
        for (int i3 = 2000; i3 < 4000; i3++) {
            int i4 = i3 % 10;
            createTextMessage.setText(i3 + "-" + i4);
            createTextMessage.setIntProperty("seq", i3);
            createTextMessage.setJMSPriority(i4);
            if (i3 > 0 && i3 % 1000 == 0) {
                LOG.info("Max send time: " + d4 + ". Sending message: " + createTextMessage.getText());
            }
            long currentTimeMillis3 = System.currentTimeMillis();
            createProducer.send(createTextMessage, 2, createTextMessage.getJMSPriority(), 0L);
            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
            d4 = Math.max(d4, currentTimeMillis4);
            if (currentTimeMillis4 == d4) {
                LOG.info("new max: " + d4 + " on i=" + i3 + ", " + createTextMessage.getText());
            }
            d5 += currentTimeMillis4;
        }
        LOG.info("Sent another: 2000, max send time: " + d4);
        double d6 = (d5 * 100.0d) / 2000.0d;
        assertTrue("max X times as slow with consumer:" + d6 + " , noConsumerMax:" + d3, d6 < d3 * 4.0d);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.jdbc.JDBCMessagePriorityTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                JDBCMessagePriorityTest.LOG.info("count: " + atomicInteger.get());
                return 4000 == atomicInteger.get();
            }
        }, 60000L);
        assertTrue("No duplicates : " + vector, vector.isEmpty());
        assertEquals("got all messages", 4000, atomicInteger.get());
    }

    public static Test suite() {
        return suite(JDBCMessagePriorityTest.class);
    }
}
