package org.apache.activemq.artemis.tests.integration.mqtt.imported;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.class */
public class PahoMQTTTest extends MQTTTestSupport {
    private static final Logger log = Logger.getLogger(PahoMQTTTest.class);
    public String protocol;

    @Parameterized.Parameters(name = "protocol={0}")
    public static Collection<Object[]> getParams() {
        return Arrays.asList(new Object[]{"tcp"}, new Object[]{"ws"});
    }

    public PahoMQTTTest(String str) {
        this.protocol = str;
    }

    /* JADX WARN: Type inference failed for: r0v31, types: [org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest$2] */
    @Test(timeout = 300000)
    public void testLotsOfClients() throws Exception {
        final int intValue = Integer.getInteger("PahoMQTTTest.CLIENTS", 100).intValue();
        log.debug("Using: {} clients: " + intValue);
        final AtomicInteger atomicInteger = new AtomicInteger();
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.setCallback(new MqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.1
            public void connectionLost(Throwable th) {
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                atomicInteger.incrementAndGet();
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(AutoCreateJmsDestinationTest.QUEUE_NAME);
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(intValue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(intValue);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        for (int i = 0; i < intValue; i++) {
            Thread.sleep(10L);
            new Thread(null, null, "client:" + i) { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        MqttClient createPahoClient2 = PahoMQTTTest.this.createPahoClient(Thread.currentThread().getName());
                        createPahoClient2.connect();
                        countDownLatch.countDown();
                        countDownLatch3.await();
                        for (int i2 = 0; i2 < 10; i2++) {
                            Thread.sleep(1000L);
                            createPahoClient2.publish(AutoCreateJmsDestinationTest.QUEUE_NAME, "hello".getBytes(), 1, false);
                        }
                        createPahoClient2.disconnect();
                        createPahoClient2.close();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        atomicReference.set(th);
                    } finally {
                        countDownLatch2.countDown();
                    }
                }
            }.start();
        }
        countDownLatch.await();
        assertNull("Async error: " + atomicReference.get(), atomicReference.get());
        countDownLatch3.countDown();
        log.debug("All clients connected... waiting to receive sent messages...");
        within(30, TimeUnit.SECONDS, new MQTTTestSupport.Task() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport.Task
            public void run() throws Exception {
                Assert.assertTrue(atomicInteger.get() == intValue * 10);
            }
        });
        log.debug("All messages received.");
        countDownLatch2.await();
        assertNull("Async error: " + atomicReference.get(), atomicReference.get());
    }

    @Test(timeout = 300000)
    public void testSendAndReceiveMQTT() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumerId");
        MqttClient createPahoClient2 = createPahoClient("producerId");
        createPahoClient.connect();
        createPahoClient.subscribe(AutoCreateJmsDestinationTest.QUEUE_NAME);
        createPahoClient.setCallback(new MqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.4
            public void connectionLost(Throwable th) {
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch.countDown();
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
        createPahoClient2.connect();
        createPahoClient2.publish(AutoCreateJmsDestinationTest.QUEUE_NAME, "hello".getBytes(), 1, false);
        waitForLatch(countDownLatch);
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testSessionPresentWithCleanSession() throws Exception {
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        assertFalse(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testSessionPresent() throws Exception {
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(false);
        assertFalse(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
        createPahoClient.disconnect();
        assertTrue(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MqttClient createPahoClient(String str) throws MqttException {
        return new MqttClient(this.protocol + "://localhost:" + getPort(), str, new MemoryPersistence());
    }
}
