/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.topic;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientReliableTopicConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={NightlyTest.class})
public class ClientReliableTopicStressTest
extends HazelcastTestSupport {
    private ILogger logger;
    private final AtomicBoolean stop = new AtomicBoolean();
    private ITopic<Long> topic;

    @Before
    public void setup() {
        this.logger = Logger.getLogger(((Object)((Object)this)).getClass());
        Config config = new Config();
        RingbufferConfig ringbufferConfig = new RingbufferConfig("foobar");
        ringbufferConfig.setCapacity(10000000);
        ringbufferConfig.setTimeToLiveSeconds(5);
        config.addRingBufferConfig(ringbufferConfig);
        ClientConfig clientConfig = new ClientConfig();
        ClientReliableTopicConfig topicConfig = new ClientReliableTopicConfig("foobar");
        clientConfig.addReliableTopicConfig(topicConfig);
        HazelcastInstance hz = Hazelcast.newHazelcastInstance((Config)config);
        HazelcastInstance client = HazelcastClient.newHazelcastClient((ClientConfig)clientConfig);
        this.topic = client.getReliableTopic(topicConfig.getName());
    }

    @After
    public void teardown() {
        HazelcastClient.shutdownAll();
        Hazelcast.shutdownAll();
    }

    @Test
    public void test() throws InterruptedException {
        final StressMessageListener listener1 = new StressMessageListener(1);
        this.topic.addMessageListener((MessageListener)listener1);
        final StressMessageListener listener2 = new StressMessageListener(2);
        this.topic.addMessageListener((MessageListener)listener2);
        ClientReliableTopicStressTest.sleepSeconds((int)5);
        final ProduceThread produceThread = new ProduceThread();
        produceThread.start();
        this.logger.info("Starting test");
        ClientReliableTopicStressTest.sleepAndStop((AtomicBoolean)this.stop, (long)TimeUnit.SECONDS.toSeconds(30L));
        this.logger.info("Completed");
        produceThread.assertSucceedsEventually();
        this.logger.info("Number of items produced: " + produceThread.send);
        ClientReliableTopicStressTest.assertTrueEventually((AssertTask)new AssertTask(){

            public void run() throws Exception {
                Assert.assertEquals((long)produceThread.send, (long)listener1.received);
                Assert.assertEquals((long)produceThread.send, (long)listener2.received);
                Assert.assertEquals((long)0L, (long)listener1.failures);
                Assert.assertEquals((long)0L, (long)listener2.failures);
            }
        });
        this.logger.info("Done");
    }

    public class StressMessageListener
    implements MessageListener<Long> {
        private final int id;
        private long received = 0L;
        private long failures = 0L;

        public StressMessageListener(int id) {
            this.id = id;
        }

        public void onMessage(Message<Long> message) {
            if (!((Long)message.getMessageObject()).equals(this.received)) {
                ++this.failures;
            }
            if (this.received % 10000L == 0L) {
                ClientReliableTopicStressTest.this.logger.info(this.toString() + " is at: " + this.received);
            }
            ++this.received;
        }

        public String toString() {
            return "StressMessageListener-" + this.id;
        }
    }

    public class ProduceThread
    extends TestThread {
        private volatile long send = 0L;

        public void onError(Throwable t) {
            ClientReliableTopicStressTest.this.stop.set(true);
        }

        public void doRun() throws Throwable {
            while (!ClientReliableTopicStressTest.this.stop.get()) {
                ClientReliableTopicStressTest.this.topic.publish((Object)this.send);
                ++this.send;
                if (this.send % 10000L != 0L) continue;
                ClientReliableTopicStressTest.this.logger.info("Publishing: " + this.send);
            }
        }
    }
}

