package com.hazelcast.topic;

import com.hazelcast.config.Config;
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.map.EntryProcessorOffloadableBouncingNodesTest;
import com.hazelcast.monitor.impl.LocalTopicStatsImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.Repeat;
import com.hazelcast.topic.impl.TopicService;
import com.hazelcast.util.UuidUtil;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
/* loaded from: input_file:com/hazelcast/topic/TopicTest.class */
public class TopicTest extends HazelcastTestSupport {

    /* loaded from: input_file:com/hazelcast/topic/TopicTest$SerializationCounting.class */
    public static class SerializationCounting implements DataSerializable {
        private AtomicInteger counter = new AtomicInteger();

        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            this.counter.incrementAndGet();
        }

        public void readData(ObjectDataInput objectDataInput) throws IOException {
        }

        public int getSerializationCount() {
            return this.counter.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/topic/TopicTest$TestMessage.class */
    public static class TestMessage implements DataSerializable {
        Member publisher;
        String data;

        TestMessage() {
        }

        TestMessage(Member member, String str) {
            this.publisher = member;
            this.data = str;
        }

        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            this.publisher.writeData(objectDataOutput);
            objectDataOutput.writeUTF(this.data);
        }

        public void readData(ObjectDataInput objectDataInput) throws IOException {
            this.publisher = new MemberImpl();
            this.publisher.readData(objectDataInput);
            this.data = objectDataInput.readUTF();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestMessage testMessage = (TestMessage) obj;
            if (this.data != null) {
                if (!this.data.equals(testMessage.data)) {
                    return false;
                }
            } else if (testMessage.data != null) {
                return false;
            }
            return this.publisher != null ? this.publisher.equals(testMessage.publisher) : testMessage.publisher == null;
        }

        public int hashCode() {
            return (31 * (this.publisher != null ? this.publisher.hashCode() : 0)) + (this.data != null ? this.data.hashCode() : 0);
        }

        public String toString() {
            return "TestMessage{publisher=" + this.publisher + ", data='" + this.data + "'}";
        }
    }

    @Test
    public void testDestroyTopicRemovesStatistics() {
        String randomString = randomString();
        HazelcastInstance createHazelcastInstance = createHazelcastInstance();
        final ITopic topic = createHazelcastInstance.getTopic(randomString);
        topic.publish("foobar");
        sleepSeconds(1);
        topic.destroy();
        final TopicService topicService = (TopicService) getNode(createHazelcastInstance).nodeEngine.getService("hz:impl:topicService");
        assertTrueEventually(new AssertTask() { // from class: com.hazelcast.topic.TopicTest.1
            @Override // com.hazelcast.test.AssertTask
            public void run() {
                Assert.assertFalse(topicService.getStatsMap().containsKey(topic.getName()));
            }
        });
    }

    @Test
    public void testTopicPublishingMember() {
        String str = "testTopicPublishingMember" + generateRandomString(5);
        HazelcastInstance[] newInstances = createHazelcastInstanceFactory(3).newInstances();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final AtomicInteger atomicInteger3 = new AtomicInteger(0);
        for (int i = 0; i < 3; i++) {
            final HazelcastInstance hazelcastInstance = newInstances[i];
            hazelcastInstance.getTopic(str).addMessageListener(new MessageListener<Member>() { // from class: com.hazelcast.topic.TopicTest.2
                public void onMessage(Message<Member> message) {
                    Member publishingMember = message.getPublishingMember();
                    if (publishingMember.equals(hazelcastInstance.getCluster().getLocalMember())) {
                        atomicInteger.incrementAndGet();
                    }
                    if (publishingMember.equals((Member) message.getMessageObject())) {
                        atomicInteger2.incrementAndGet();
                    }
                    if (publishingMember.localMember()) {
                        atomicInteger3.incrementAndGet();
                    }
                }
            });
        }
        for (int i2 = 0; i2 < 3; i2++) {
            HazelcastInstance hazelcastInstance2 = newInstances[i2];
            hazelcastInstance2.getTopic(str).publish(hazelcastInstance2.getCluster().getLocalMember());
        }
        assertTrueEventually(new AssertTask() { // from class: com.hazelcast.topic.TopicTest.3
            @Override // com.hazelcast.test.AssertTask
            public void run() {
                Assert.assertEquals(3L, atomicInteger.get());
                Assert.assertEquals(9L, atomicInteger2.get());
                Assert.assertEquals(3L, atomicInteger3.get());
            }
        });
    }

    @Test
    public void testTopicLocalOrder() throws Exception {
        final String randomString = randomString();
        Config config = new Config();
        config.getTopicConfig(randomString).setGlobalOrderingEnabled(false);
        final HazelcastInstance[] newInstances = createHazelcastInstanceFactory(5).newInstances(config);
        final List[] listArr = new List[5];
        for (int i = 0; i < 5; i++) {
            listArr[i] = new CopyOnWriteArrayList();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        final CountDownLatch countDownLatch2 = new CountDownLatch(25000);
        final CountDownLatch countDownLatch3 = new CountDownLatch(5000);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i2 = 0; i2 < 5; i2++) {
            final int i3 = i2;
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.topic.TopicTest.4
                @Override // java.lang.Runnable
                public void run() {
                    final List list = listArr[i3];
                    HazelcastInstance hazelcastInstance = newInstances[i3];
                    ITopic topic = hazelcastInstance.getTopic(randomString);
                    topic.addMessageListener(new MessageListener<TestMessage>() { // from class: com.hazelcast.topic.TopicTest.4.1
                        public void onMessage(Message<TestMessage> message) {
                            list.add(message.getMessageObject());
                            countDownLatch2.countDown();
                        }
                    });
                    countDownLatch.countDown();
                    try {
                        countDownLatch.await(1L, TimeUnit.MINUTES);
                        Member localMember = hazelcastInstance.getCluster().getLocalMember();
                        for (int i4 = 0; i4 < 1000; i4++) {
                            topic.publish(new TestMessage(localMember, UuidUtil.newUnsecureUuidString()));
                            countDownLatch3.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            Assert.assertTrue(countDownLatch3.await(2L, TimeUnit.MINUTES));
            Assert.assertTrue(countDownLatch2.await(5L, TimeUnit.MINUTES));
            TestMessage[] testMessageArr = new TestMessage[listArr[0].size()];
            listArr[0].toArray(testMessageArr);
            Comparator<TestMessage> comparator = new Comparator<TestMessage>() { // from class: com.hazelcast.topic.TopicTest.5
                @Override // java.util.Comparator
                public int compare(TestMessage testMessage, TestMessage testMessage2) {
                    return testMessage.publisher.getUuid().compareTo(testMessage2.publisher.getUuid());
                }
            };
            Arrays.sort(testMessageArr, comparator);
            for (int i4 = 1; i4 < 5; i4++) {
                TestMessage[] testMessageArr2 = new TestMessage[listArr[i4].size()];
                listArr[i4].toArray(testMessageArr2);
                Arrays.sort(testMessageArr2, comparator);
                Assert.assertArrayEquals(testMessageArr, testMessageArr2);
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    @Test
    public void testTopicGlobalOrder() throws Exception {
        String randomString = randomString();
        Config config = new Config();
        config.getTopicConfig(randomString).setGlobalOrderingEnabled(true);
        HazelcastInstance[] newInstances = createHazelcastInstanceFactory(5).newInstances(config);
        final List[] listArr = new List[5];
        for (int i = 0; i < 5; i++) {
            listArr[i] = new CopyOnWriteArrayList();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(5000);
        for (int i2 = 0; i2 < newInstances.length; i2++) {
            final int i3 = i2;
            newInstances[i2].getTopic(randomString).addMessageListener(new MessageListener<TestMessage>() { // from class: com.hazelcast.topic.TopicTest.6
                public void onMessage(Message<TestMessage> message) {
                    listArr[i3].add(message.getMessageObject());
                    countDownLatch.countDown();
                }
            });
        }
        for (HazelcastInstance hazelcastInstance : newInstances) {
            Member localMember = hazelcastInstance.getCluster().getLocalMember();
            for (int i4 = 0; i4 < 1000; i4++) {
                hazelcastInstance.getTopic(randomString).publish(new TestMessage(localMember, UUID.randomUUID().toString()));
            }
        }
        assertTrueEventually(new AssertTask() { // from class: com.hazelcast.topic.TopicTest.7
            @Override // com.hazelcast.test.AssertTask
            public void run() throws Exception {
                int i5 = 0;
                do {
                    List list = listArr[i5];
                    int i6 = i5;
                    i5++;
                    Assert.assertEquals(list, listArr[i6]);
                } while (i5 < 5);
            }
        });
    }

    @Test
    public void testName() {
        String randomString = randomString();
        Assert.assertEquals(randomString, createHazelcastInstance().getTopic(randomString).getName());
    }

    @Test
    public void addMessageListener() throws InterruptedException {
        ITopic topic = createHazelcastInstance().getTopic("addMessageListener" + generateRandomString(5));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.8
            public void onMessage(Message<String> message) {
                if (((String) message.getMessageObject()).equals("Hazelcast Rocks!")) {
                    countDownLatch.countDown();
                }
            }
        });
        topic.publish("Hazelcast Rocks!");
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testConfigListenerRegistration() throws InterruptedException {
        Config config = new Config();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        config.getTopicConfig("default").addMessageListenerConfig(new ListenerConfig().setImplementation(new MessageListener() { // from class: com.hazelcast.topic.TopicTest.9
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        }));
        createHazelcastInstance(config).getTopic("default").publish(1);
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void addTwoMessageListener() throws InterruptedException {
        ITopic topic = createHazelcastInstance().getTopic("addTwoMessageListener" + generateRandomString(5));
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.10
            public void onMessage(Message<String> message) {
                if (((String) message.getMessageObject()).equals("Hazelcast Rocks!")) {
                    countDownLatch.countDown();
                }
            }
        });
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.11
            public void onMessage(Message<String> message) {
                if (((String) message.getMessageObject()).equals("Hazelcast Rocks!")) {
                    countDownLatch.countDown();
                }
            }
        });
        topic.publish("Hazelcast Rocks!");
        Assert.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
    }

    @Test
    @Repeat(10)
    public void removeMessageListener() throws InterruptedException {
        try {
            ITopic topic = createHazelcastInstance().getTopic("removeMessageListener" + generateRandomString(5));
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            MessageListener<String> messageListener = new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.12
                public void onMessage(Message<String> message) {
                    atomicInteger.incrementAndGet();
                    countDownLatch.countDown();
                }
            };
            String str = "message_" + messageListener.hashCode() + "_";
            String addMessageListener = topic.addMessageListener(messageListener);
            topic.publish(str + "1");
            countDownLatch.await();
            Assert.assertTrue(topic.removeMessageListener(addMessageListener));
            topic.publish(str + "2");
            assertTrueEventually(new AssertTask() { // from class: com.hazelcast.topic.TopicTest.13
                @Override // com.hazelcast.test.AssertTask
                public void run() {
                    Assert.assertEquals(1L, atomicInteger.get());
                }
            });
            shutdownNodeFactory();
        } catch (Throwable th) {
            shutdownNodeFactory();
            throw th;
        }
    }

    @Test
    public void testPerformance() throws InterruptedException {
        String randomString = randomString();
        HazelcastInstance createHazelcastInstance = createHazelcastInstance();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        final ITopic topic = createHazelcastInstance.getTopic(randomString);
        final CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int i = 0; i < 10000; i++) {
            newFixedThreadPool.submit(new Runnable() { // from class: com.hazelcast.topic.TopicTest.14
                @Override // java.lang.Runnable
                public void run() {
                    topic.publish("my object");
                    countDownLatch.countDown();
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
    }

    @Test
    public void addTwoListenerAndRemoveOne() throws InterruptedException {
        ITopic topic = createHazelcastInstance().getTopic("addTwoListenerAndRemoveOne" + generateRandomString(5));
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        final AtomicInteger atomicInteger = new AtomicInteger();
        MessageListener<String> messageListener = new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.15
            public void onMessage(Message<String> message) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                countDownLatch2.countDown();
            }
        };
        MessageListener<String> messageListener2 = new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.16
            public void onMessage(Message<String> message) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                countDownLatch2.countDown();
            }
        };
        String addMessageListener = topic.addMessageListener(messageListener);
        topic.addMessageListener(messageListener2);
        topic.publish("Hazelcast Rocks!");
        assertOpenEventually(countDownLatch2);
        topic.removeMessageListener(addMessageListener);
        topic.publish("Hazelcast Rocks!");
        assertOpenEventually(countDownLatch);
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test
    public void testTopicCluster() throws InterruptedException {
        String str = "TestMessages" + generateRandomString(5);
        HazelcastInstance[] newInstances = createHazelcastInstanceFactory(2).newInstances(new Config());
        HazelcastInstance hazelcastInstance = newInstances[0];
        HazelcastInstance hazelcastInstance2 = newInstances[1];
        ITopic topic = hazelcastInstance.getTopic(str);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final String str2 = "Test" + randomString();
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.17
            public void onMessage(Message message) {
                Assert.assertEquals(str2, message.getMessageObject());
                countDownLatch.countDown();
            }
        });
        ITopic topic2 = hazelcastInstance2.getTopic(str);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        topic2.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.18
            public void onMessage(Message message) {
                Assert.assertEquals(str2, message.getMessageObject());
                countDownLatch2.countDown();
            }
        });
        topic.publish(str2);
        assertOpenEventually(countDownLatch);
        hazelcastInstance.shutdown();
        topic2.publish(str2);
        assertOpenEventually(countDownLatch2);
    }

    @Test
    public void testTopicStats() throws InterruptedException {
        ITopic topic = createHazelcastInstance().getTopic("testTopicStats" + generateRandomString(5));
        final CountDownLatch countDownLatch = new CountDownLatch(EntryProcessorOffloadableBouncingNodesTest.COUNT_ENTRIES);
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.19
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        final CountDownLatch countDownLatch2 = new CountDownLatch(EntryProcessorOffloadableBouncingNodesTest.COUNT_ENTRIES);
        topic.addMessageListener(new MessageListener<String>() { // from class: com.hazelcast.topic.TopicTest.20
            public void onMessage(Message message) {
                countDownLatch2.countDown();
            }
        });
        for (int i = 0; i < 1000; i++) {
            topic.publish("sancar");
        }
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.MINUTES));
        Assert.assertTrue(countDownLatch2.await(1L, TimeUnit.MINUTES));
        LocalTopicStatsImpl localTopicStats = topic.getLocalTopicStats();
        Assert.assertEquals(1000L, localTopicStats.getPublishOperationCount());
        Assert.assertEquals(2000L, localTopicStats.getReceiveOperationCount());
    }

    @Test
    @Category({NightlyTest.class})
    public void testTopicMultiThreading() throws Exception {
        final String randomString = randomString();
        Config config = new Config();
        config.getTopicConfig(randomString).setGlobalOrderingEnabled(false);
        config.getTopicConfig(randomString).setMultiThreadingEnabled(true);
        final HazelcastInstance[] newInstances = createHazelcastInstanceFactory(5).newInstances(config);
        final Set[] setArr = new Set[5];
        for (int i = 0; i < 5; i++) {
            setArr[i] = new HashSet();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        final CountDownLatch countDownLatch2 = new CountDownLatch(25000);
        final CountDownLatch countDownLatch3 = new CountDownLatch(5000);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i2 = 0; i2 < 5; i2++) {
            final int i3 = i2;
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.topic.TopicTest.21
                @Override // java.lang.Runnable
                public void run() {
                    final Set set = setArr[i3];
                    HazelcastInstance hazelcastInstance = newInstances[i3];
                    ITopic topic = hazelcastInstance.getTopic(randomString);
                    topic.addMessageListener(new MessageListener<TestMessage>() { // from class: com.hazelcast.topic.TopicTest.21.1
                        public void onMessage(Message<TestMessage> message) {
                            set.add(Thread.currentThread().getName());
                            countDownLatch2.countDown();
                        }
                    });
                    countDownLatch.countDown();
                    try {
                        countDownLatch.await(1L, TimeUnit.MINUTES);
                        Member localMember = hazelcastInstance.getCluster().getLocalMember();
                        for (int i4 = 0; i4 < 1000; i4++) {
                            topic.publish(new TestMessage(localMember, UuidUtil.newUnsecureUuidString()));
                            countDownLatch3.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            Assert.assertTrue(countDownLatch3.await(2L, TimeUnit.MINUTES));
            Assert.assertTrue(countDownLatch2.await(5L, TimeUnit.MINUTES));
            boolean z = false;
            for (int i4 = 0; i4 < 5; i4++) {
                if (setArr[i4].size() > 1) {
                    z = true;
                }
            }
            Assert.assertTrue("All listeners received messages in single thread. Expecting more threads involved", z);
            newFixedThreadPool.shutdownNow();
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }

    @Test
    public void givenTopicHasNoSubscriber_whenMessageIsPublished_thenNoSerialializationIsInvoked() {
        ITopic topic = createHazelcastInstanceFactory(2).newInstances()[0].getTopic(randomString());
        SerializationCounting serializationCounting = new SerializationCounting();
        topic.publish(serializationCounting);
        assertNoSerializationInvoked(serializationCounting);
    }

    private void assertNoSerializationInvoked(SerializationCounting serializationCounting) {
        Assert.assertEquals(0L, serializationCounting.getSerializationCount());
    }
}
