package com.hazelcast.client.topic;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.proxy.ClientReliableTopicProxy;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.bounce.BounceMemberRule;
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.impl.reliable.DurableSubscriptionTest;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/client/topic/ClientReliableTopicOnClusterRestartTest.class */
public class ClientReliableTopicOnClusterRestartTest {
    private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory();

    @After
    public void tearDown() {
        this.hazelcastFactory.terminateAll();
    }

    @Test
    public void serverRestartWhenReliableTopicListenerRegistered() {
        HazelcastInstance newHazelcastInstance = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(BounceMemberRule.STALENESS_DETECTOR_DISABLED);
        HazelcastInstance newHazelcastClient = this.hazelcastFactory.newHazelcastClient(clientConfig);
        HazelcastInstance newHazelcastClient2 = this.hazelcastFactory.newHazelcastClient(clientConfig);
        ITopic reliableTopic = newHazelcastClient.getReliableTopic("topic");
        ITopic reliableTopic2 = newHazelcastClient2.getReliableTopic("topic");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        reliableTopic.addMessageListener(message -> {
            countDownLatch.countDown();
        });
        newHazelcastInstance.getLifecycleService().terminate();
        this.hazelcastFactory.newHazelcastInstance();
        reliableTopic2.publish(5);
        HazelcastTestSupport.assertOpenEventually(countDownLatch);
    }

    @Test
    public void shouldContinue_OnClusterRestart_afterInvocationTimeout() throws InterruptedException {
        HazelcastInstance newHazelcastInstance = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(BounceMemberRule.STALENESS_DETECTOR_DISABLED);
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(2));
        HazelcastInstance newHazelcastClient = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ClientReliableTopicProxy reliableTopic = newHazelcastClient.getReliableTopic("topic");
        UUID addMessageListener = reliableTopic.addMessageListener(new DurableSubscriptionTest.DurableMessageListener<String>() { // from class: com.hazelcast.client.topic.ClientReliableTopicOnClusterRestartTest.1
            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public void onMessage(Message<String> message) {
                countDownLatch.countDown();
            }

            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public boolean isLossTolerant() {
                return true;
            }
        });
        newHazelcastInstance.shutdown();
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        this.hazelcastFactory.newHazelcastInstance().getReliableTopic("topic").publish("message");
        HazelcastTestSupport.assertOpenEventually(countDownLatch);
        Assert.assertFalse(reliableTopic.isListenerCancelled(addMessageListener));
    }

    @Test
    public void shouldContinue_OnClusterRestart_whenDataLoss_LossTolerant_afterInvocationTimeout() throws InterruptedException {
        HazelcastInstance newHazelcastInstance = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(BounceMemberRule.STALENESS_DETECTOR_DISABLED);
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(2));
        HazelcastInstance newHazelcastClient = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        newHazelcastInstance.getReliableTopic("topic").publish("message");
        newHazelcastInstance.getReliableTopic("topic").publish("message");
        ClientReliableTopicProxy reliableTopic = newHazelcastClient.getReliableTopic("topic");
        UUID addMessageListener = reliableTopic.addMessageListener(new DurableSubscriptionTest.DurableMessageListener<String>() { // from class: com.hazelcast.client.topic.ClientReliableTopicOnClusterRestartTest.2
            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public void onMessage(Message<String> message) {
                atomicLong.incrementAndGet();
                countDownLatch.countDown();
            }

            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public boolean isLossTolerant() {
                return true;
            }
        });
        newHazelcastInstance.shutdown();
        HazelcastInstance newHazelcastInstance2 = this.hazelcastFactory.newHazelcastInstance();
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        newHazelcastInstance2.getReliableTopic("topic").publish("message");
        HazelcastTestSupport.assertOpenEventually(countDownLatch);
        Assert.assertFalse(reliableTopic.isListenerCancelled(addMessageListener));
        TestCase.assertEquals(1L, atomicLong.get());
    }

    @Test
    public void shouldFail_OnClusterRestart_whenDataLoss_notLossTolerant() throws InterruptedException {
        HazelcastInstance newHazelcastInstance = this.hazelcastFactory.newHazelcastInstance();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(BounceMemberRule.STALENESS_DETECTOR_DISABLED);
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(2));
        HazelcastInstance newHazelcastClient = this.hazelcastFactory.newHazelcastClient(clientConfig);
        final AtomicLong atomicLong = new AtomicLong();
        newHazelcastInstance.getReliableTopic("topic").publish("message");
        newHazelcastInstance.getReliableTopic("topic").publish("message");
        ITopic reliableTopic = newHazelcastClient.getReliableTopic("topic");
        UUID addMessageListener = reliableTopic.addMessageListener(new DurableSubscriptionTest.DurableMessageListener<String>() { // from class: com.hazelcast.client.topic.ClientReliableTopicOnClusterRestartTest.3
            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public void onMessage(Message<String> message) {
                atomicLong.incrementAndGet();
            }

            @Override // com.hazelcast.topic.impl.reliable.DurableSubscriptionTest.DurableMessageListener
            public boolean isLossTolerant() {
                return false;
            }
        });
        newHazelcastInstance.shutdown();
        HazelcastInstance newHazelcastInstance2 = this.hazelcastFactory.newHazelcastInstance();
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        newHazelcastInstance2.getReliableTopic("topic").publish("message");
        HazelcastTestSupport.assertTrueEventually(() -> {
            TestCase.assertTrue(((ClientReliableTopicProxy) reliableTopic).isListenerCancelled(addMessageListener));
        }, 10L);
        TestCase.assertEquals(0L, atomicLong.get());
    }
}
