package org.apache.activemq.artemis.tests.integration.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.MutableLong;
import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.class */
public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest {
    private DistributedPrimitiveManagerConfiguration managerConfiguration;

    @Before
    public void init() throws IOException {
        this.managerConfiguration = new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), Collections.singletonMap("locks-folder", this.temporaryFolder.newFolder("manager").toString()));
    }

    @Override // org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationTest
    protected HAPolicyConfiguration createReplicationLiveConfiguration() {
        ReplicationPrimaryPolicyConfiguration withDefault = ReplicationPrimaryPolicyConfiguration.withDefault();
        withDefault.setDistributedManagerConfiguration(this.managerConfiguration);
        return withDefault;
    }

    @Override // org.apache.activemq.artemis.tests.integration.replication.SharedNothingReplicationTest
    protected HAPolicyConfiguration createReplicationBackupConfiguration() {
        ReplicationBackupPolicyConfiguration withDefault = ReplicationBackupPolicyConfiguration.withDefault();
        withDefault.setDistributedManagerConfiguration(this.managerConfiguration);
        withDefault.setClusterName("cluster");
        return withDefault;
    }

    @Test
    public void testUnReplicatedOrderedTransition() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer.setIdentity("LIVE");
        addServer.start();
        addServer.getClass();
        Wait.waitFor(addServer::isStarted);
        ServerLocator newLocator = ServerLocatorImpl.newLocator("(tcp://localhost:61616,tcp://localhost:61617)?ha=true");
        newLocator.setCallTimeout(60000L);
        newLocator.setConnectionTTL(60000L);
        ClientSession createSession = newLocator.createSessionFactory().createSession();
        createSession.createQueue(new QueueConfiguration("slow").setRoutingType(RoutingType.ANYCAST));
        createSession.close();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer2.setIdentity("BACKUP");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        waitForTopology(addServer, 1, 1, 30000L);
        waitForTopology(addServer2, 1, 1, 30000L);
        addServer.stop();
        ClientSession createSession2 = newLocator.createSessionFactory().createSession();
        createSession2.createQueue(new QueueConfiguration("slow_un_replicated").setRoutingType(RoutingType.ANYCAST));
        createSession2.close();
        waitForTopology(addServer2, 1, 0, 30000L);
        assertTrue(Wait.waitFor(() -> {
            return 2 == addServer2.getNodeManager().getNodeActivationSequence();
        }));
        addServer2.stop(false);
        addServer.start();
        addServer.getClass();
        Wait.assertFalse(addServer::isActive);
        addServer.stop();
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        assertEquals(3L, addServer2.getNodeManager().getNodeActivationSequence());
        ClientSession createSession3 = newLocator.createSessionFactory().createSession();
        createSession3.createQueue(new QueueConfiguration("backup_as_un_replicated").setRoutingType(RoutingType.ANYCAST));
        createSession3.close();
        addServer.start();
        ClientSession createSession4 = newLocator.createSessionFactory().createSession();
        createSession4.createQueue(new QueueConfiguration("backup_as_replicated").setRoutingType(RoutingType.ANYCAST));
        createSession4.close();
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(Wait.waitFor(() -> {
            return 3 == addServer.getNodeManager().getNodeActivationSequence();
        }));
        addServer2.stop(true);
        waitForTopology(addServer, 1, 0, 30000L);
        assertTrue(Wait.waitFor(() -> {
            return 4 == addServer.getNodeManager().getNodeActivationSequence();
        }));
        addServer.stop(true);
        createSession4.close();
        newLocator.close();
    }

    @Test
    public void testBackupFailoverAndPrimaryFailback() throws Exception {
        int millis = (int) TimeUnit.SECONDS.toMillis(30L);
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer.setIdentity("PRIMARY");
        addServer.start();
        Assert.assertEquals(1L, addServer.getNodeManager().getNodeActivationSequence());
        Configuration createBackupConfiguration = createBackupConfiguration();
        createBackupConfiguration.getHAPolicyConfiguration().setAllowFailBack(true);
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration));
        addServer2.setIdentity("BACKUP");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return addServer2.isReplicaSync();
        }, millis);
        Assert.assertEquals(1L, addServer.getNodeManager().getNodeActivationSequence());
        Assert.assertEquals(1L, addServer2.getNodeManager().getNodeActivationSequence());
        addServer.stop();
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return 2 == addServer2.getNodeManager().getNodeActivationSequence();
        }, millis);
        TimeUnit.MILLISECONDS.sleep(100L);
        addServer.start();
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return addServer2.isReplicaSync();
        }, millis);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            try {
                return 3 == addServer.getNodeManager().getNodeActivationSequence();
            } catch (NullPointerException e) {
                return false;
            }
        }, millis);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return addServer.isReplicaSync();
        }, millis);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return addServer2.isReplicaSync();
        }, millis);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return 3 == addServer2.getNodeManager().getNodeActivationSequence();
        }, millis);
        TimeUnit.MILLISECONDS.sleep(100L);
        addServer2.stop();
        TimeUnit.MILLISECONDS.sleep(100L);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            try {
                return 4 == addServer.getNodeManager().getNodeActivationSequence();
            } catch (NullPointerException e) {
                return false;
            }
        }, millis);
        addServer.stop();
    }

    @Test
    public void testPrimaryIncrementActivationSequenceOnUnReplicated() throws Exception {
        int millis = (int) TimeUnit.SECONDS.toMillis(30L);
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer.setIdentity("PRIMARY");
        addServer.start();
        Assert.assertEquals(1L, addServer.getNodeManager().getNodeActivationSequence());
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer2.setIdentity("BACKUP");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return addServer2.isReplicaSync();
        }, millis);
        Assert.assertEquals(1L, addServer.getNodeManager().getNodeActivationSequence());
        Assert.assertEquals(1L, addServer2.getNodeManager().getNodeActivationSequence());
        addServer2.stop();
        org.apache.activemq.artemis.utils.Wait.assertTrue(() -> {
            return 2 == addServer.getNodeManager().getNodeActivationSequence();
        }, millis);
        addServer.stop();
    }

    @Test
    public void testBackupStartsFirst() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer.setIdentity("BACKUP");
        addServer.start();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer2.setIdentity("LIVE");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
    }

    @Test
    public void testBackupOutOfSequenceReleasesLock() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer.setIdentity("BACKUP");
        addServer.start();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer2.setIdentity("LIVE");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
        addServer.stop();
        TimeUnit.SECONDS.sleep(1L);
        addServer2.stop();
        addServer.start();
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
    }

    @Test
    public void testBackupOutOfSequenceCheckActivationSequence() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer.setIdentity("BACKUP");
        addServer.start();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer2.setIdentity("LIVE");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
        addServer.stop();
        TimeUnit.SECONDS.sleep(1L);
        String simpleString = addServer2.getNodeID().toString();
        addServer2.stop();
        DistributedPrimitiveManager newInstanceOf = DistributedPrimitiveManager.newInstanceOf(this.managerConfiguration.getClassName(), this.managerConfiguration.getProperties());
        newInstanceOf.start();
        assertTrue(newInstanceOf.getDistributedLock(simpleString).tryLock());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        addServer.registerActivateCallback(new ActivateCallback() { // from class: org.apache.activemq.artemis.tests.integration.replication.PluggableQuorumReplicationTest.1
            public void preActivate() {
                super.preActivate();
                countDownLatch.countDown();
            }
        });
        addServer.start();
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        newInstanceOf.stop();
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
    }

    @Test
    public void testSelfRepairPrimary() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer.setIdentity("LIVE");
        addServer.start();
        String simpleString = addServer.getNodeID().toString();
        addServer.getClass();
        Wait.waitFor(addServer::isStarted);
        addServer.stop();
        addServer.start();
        addServer.getClass();
        Wait.waitFor(addServer::isStarted);
        Assert.assertEquals(2L, addServer.getNodeManager().getNodeActivationSequence());
        addServer.stop();
        DistributedPrimitiveManager newInstanceOf = DistributedPrimitiveManager.newInstanceOf(this.managerConfiguration.getClassName(), this.managerConfiguration.getProperties());
        newInstanceOf.start();
        DistributedLock distributedLock = newInstanceOf.getDistributedLock(simpleString);
        Throwable th = null;
        try {
            try {
                assertTrue(distributedLock.tryLock());
                newInstanceOf.getMutableLong(simpleString).compareAndSet(2L, -2L);
                if (distributedLock != null) {
                    if (0 != 0) {
                        try {
                            distributedLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        distributedLock.close();
                    }
                }
                addServer.start();
                addServer.getClass();
                Wait.waitFor(addServer::isStarted);
                Assert.assertEquals(3L, addServer.getNodeManager().getNodeActivationSequence());
                Assert.assertEquals(3L, newInstanceOf.getMutableLong(simpleString).get());
                newInstanceOf.stop();
                ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
                addServer2.setIdentity("BACKUP");
                addServer2.start();
                addServer2.getClass();
                Wait.waitFor(addServer2::isReplicaSync);
                Assert.assertEquals(3L, addServer2.getNodeManager().getNodeActivationSequence());
                addServer2.stop();
            } finally {
            }
        } catch (Throwable th3) {
            if (distributedLock != null) {
                if (th != null) {
                    try {
                        distributedLock.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    distributedLock.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPrimaryPeers() throws Exception {
        Configuration createLiveConfiguration = createLiveConfiguration();
        createLiveConfiguration.getHAPolicyConfiguration().setCoordinationId("some-shared-id-001");
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration));
        addServer.setIdentity("LIVE");
        addServer.start();
        addServer.getClass();
        Wait.waitFor(addServer::isStarted);
        ServerLocator newLocator = ServerLocatorImpl.newLocator("(tcp://localhost:61616,tcp://localhost:61617)?ha=true");
        newLocator.setCallTimeout(60000L);
        newLocator.setConnectionTTL(60000L);
        ClientSessionFactory createSessionFactory = newLocator.createSessionFactory();
        ClientSessionFactory createSessionFactory2 = newLocator.createSessionFactory();
        sendTo(createSessionFactory2, "live_un_replicated");
        createSessionFactory2.close();
        Configuration createBackupConfiguration = createBackupConfiguration();
        createBackupConfiguration.setHAPolicyConfiguration(createReplicationLiveConfiguration());
        createBackupConfiguration.getHAPolicyConfiguration().setCoordinationId("some-shared-id-001");
        createBackupConfiguration.setName("localhost::live-peer");
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration));
        addServer2.setIdentity("LIVE-PEER");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        waitForTopology(addServer, 1, 1, 30000L);
        waitForTopology(addServer2, 1, 1, 30000L);
        addServer.stop();
        ClientSessionFactory createSessionFactory3 = newLocator.createSessionFactory();
        receiveFrom(createSessionFactory3, "live_un_replicated");
        sendTo(createSessionFactory3, "peer_un_replicated");
        createSessionFactory3.close();
        waitForTopology(addServer2, 1, 0, 30000L);
        assertTrue(Wait.waitFor(() -> {
            return 2 == addServer2.getNodeManager().getNodeActivationSequence();
        }));
        addServer2.stop(false);
        addServer.start();
        Wait.assertTrue(() -> {
            return !addServer.isActive();
        });
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        assertEquals(3L, addServer2.getNodeManager().getNodeActivationSequence());
        ClientSessionFactory createSessionFactory4 = newLocator.createSessionFactory();
        receiveFrom(createSessionFactory4, "peer_un_replicated");
        sendTo(createSessionFactory4, "backup_as_un_replicated");
        createSessionFactory4.close();
        addServer.start();
        ClientSessionFactory createSessionFactory5 = newLocator.createSessionFactory();
        receiveFrom(createSessionFactory5, "backup_as_un_replicated");
        sendTo(createSessionFactory5, "backup_as_replicated");
        createSessionFactory5.close();
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(Wait.waitFor(() -> {
            return 3 == addServer.getNodeManager().getNodeActivationSequence();
        }));
        waitForTopology(addServer, 1, 1, 30000L);
        waitForTopology(addServer2, 1, 1, 30000L);
        addServer2.stop(true);
        assertTrue(Wait.waitFor(() -> {
            return 4 == addServer.getNodeManager().getNodeActivationSequence();
        }));
        ClientSessionFactory createSessionFactory6 = newLocator.createSessionFactory();
        receiveFrom(createSessionFactory6, "backup_as_replicated");
        createSessionFactory6.close();
        waitForTopology(addServer, 1, 0, 30000L);
        addServer.stop(true);
        createSessionFactory.close();
        newLocator.close();
    }

    @Test
    public void testUnavailableSelfHeal() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer.setIdentity("BACKUP");
        addServer.start();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration()));
        addServer2.setIdentity("LIVE");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
        String simpleString = addServer2.getNodeID().toString();
        addServer.stop();
        TimeUnit.MILLISECONDS.sleep(500L);
        addServer2.stop();
        DistributedPrimitiveManager newInstanceOf = DistributedPrimitiveManager.newInstanceOf(this.managerConfiguration.getClassName(), this.managerConfiguration.getProperties());
        newInstanceOf.start();
        MutableLong mutableLong = newInstanceOf.getMutableLong(simpleString);
        Assert.assertTrue(mutableLong.compareAndSet(2L, -2L));
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        Assert.assertEquals(3L, addServer2.getNodeManager().getNodeActivationSequence());
        Assert.assertEquals(3L, mutableLong.get());
    }

    @Test
    public void testUnavailableAutoRepair() throws Exception {
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(createBackupConfiguration()));
        addServer.setIdentity("BACKUP");
        addServer.start();
        Configuration createLiveConfiguration = createLiveConfiguration();
        ActiveMQServer addServer2 = addServer(ActiveMQServers.newActiveMQServer(createLiveConfiguration));
        addServer2.setIdentity("LIVE");
        addServer2.start();
        addServer2.getClass();
        Wait.waitFor(addServer2::isStarted);
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isStarted));
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
        String simpleString = addServer2.getNodeID().toString();
        addServer.stop();
        TimeUnit.MILLISECONDS.sleep(500L);
        addServer2.stop();
        DistributedPrimitiveManager newInstanceOf = DistributedPrimitiveManager.newInstanceOf(this.managerConfiguration.getClassName(), this.managerConfiguration.getProperties());
        newInstanceOf.start();
        Assert.assertTrue(newInstanceOf.getMutableLong(simpleString).compareAndSet(2L, -2L));
        FileLockNodeManager fileLockNodeManager = new FileLockNodeManager(createLiveConfiguration.getNodeManagerLockLocation().getAbsoluteFile(), true);
        fileLockNodeManager.start();
        assertEquals(2L, fileLockNodeManager.readNodeActivationSequence());
        fileLockNodeManager.writeNodeActivationSequence(1L);
        fileLockNodeManager.stop();
        addServer.start();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            countDownLatch.countDown();
            try {
                addServer2.start();
            } catch (Throwable th) {
                th.printStackTrace();
            }
        }).start();
        countDownLatch.await();
        TimeUnit.MILLISECONDS.sleep(500L);
        Wait.waitFor(() -> {
            return addServer2.isStarted() || addServer.isStarted();
        });
        addServer.getClass();
        assertTrue(Wait.waitFor(addServer::isReplicaSync));
        assertTrue(addServer2.isReplicaSync());
        assertEquals(3L, addServer.getNodeManager().getNodeActivationSequence());
        assertEquals(3L, addServer2.getNodeManager().getNodeActivationSequence());
    }

    private void sendTo(ClientSessionFactory clientSessionFactory, String str) throws Exception {
        ClientSession createSession = clientSessionFactory.createSession(true, true);
        createSession.createQueue(new QueueConfiguration(str).setRoutingType(RoutingType.ANYCAST).setDurable(true));
        ClientProducer createProducer = createSession.createProducer(str);
        ClientMessage createMessage = createSession.createMessage(true);
        createMessage.putStringProperty("K", str);
        createMessage.putLongProperty("delay", 0L);
        createProducer.send(createMessage);
        createProducer.close();
        createSession.close();
    }

    private void receiveFrom(ClientSessionFactory clientSessionFactory, String str) throws Exception {
        ClientSession createSession = clientSessionFactory.createSession(true, true);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(str);
        ClientMessage receive = createConsumer.receive(4000L);
        assertNotNull(receive);
        assertTrue(receive.getStringProperty("K").equals(str));
        createConsumer.close();
        createSession.close();
    }
}
