package org.apache.activemq.artemis.tests.integration.balancing;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.class */
public class MQTTRedirectTest extends BalancingTestBase {
    private final boolean discovery = true;

    @Test
    public void testSimpleRedirect() throws Exception {
        setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
        setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
        setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, "FIRST_ELEMENT", null, false, null, 1);
        startServers(0, 1);
        getServer(0).createQueue(new QueueConfiguration("RedirectTestTopic").setRoutingType(RoutingType.ANYCAST));
        getServer(1).createQueue(new QueueConfiguration("RedirectTestTopic").setRoutingType(RoutingType.ANYCAST));
        QueueControl queueControl = (QueueControl) getServer(0).getManagementService().getResource("queue.RedirectTestTopic");
        QueueControl queueControl2 = (QueueControl) getServer(1).getManagementService().getResource("queue.RedirectTestTopic");
        Assert.assertEquals(0L, queueControl.countMessages());
        Assert.assertEquals(0L, queueControl2.countMessages());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setUserName("admin");
        mqttConnectOptions.setPassword("admin".toCharArray());
        MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "TEST", new MemoryPersistence());
        try {
            mqttClient.connect(mqttConnectOptions);
            Assert.fail();
        } catch (MqttException e) {
            Assert.assertEquals(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER, MqttConnectReturnCode.valueOf((byte) e.getReasonCode()));
        }
        mqttClient.close();
        TabularData tabularData = (TabularData) ((CompositeData) ((BrokerBalancerControl) getServer(0).getManagementService().getResource("brokerbalancer.bb1")).getTarget("admin").get("connector")).get("params");
        CompositeData compositeData = tabularData.get(new Object[]{"host"});
        CompositeData compositeData2 = tabularData.get(new Object[]{"port"});
        String str = compositeData != null ? (String) compositeData.get("value") : "localhost";
        int intValue = compositeData2 != null ? Integer.valueOf((String) compositeData2.get("value")).intValue() : 61616;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ArrayList arrayList = new ArrayList();
        MqttClient mqttClient2 = new MqttClient("tcp://" + str + ":" + intValue, "TEST", new MemoryPersistence());
        mqttClient2.connect(mqttConnectOptions);
        Assert.assertEquals(0L, queueControl.countMessages());
        Assert.assertEquals(0L, queueControl2.countMessages());
        mqttClient2.subscribe("RedirectTestTopic", (str2, mqttMessage) -> {
            arrayList.add(mqttMessage);
            countDownLatch.countDown();
        });
        mqttClient2.publish("RedirectTestTopic", new MqttMessage("TEST".getBytes()));
        Assert.assertTrue(countDownLatch.await(3000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("TEST", new String(((MqttMessage) arrayList.get(0)).getPayload()));
        mqttClient2.disconnect();
        mqttClient2.close();
        Assert.assertEquals(0L, queueControl.countMessages());
        Wait.assertEquals(0L, () -> {
            return queueControl2.countMessages();
        });
    }
}
