/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ReplicatorRateLimiterTest
extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);

    @BeforeMethod
    public void beforeMethod(Method m) throws Exception {
        this.methodName = m.getName();
    }

    @Override
    @BeforeClass(timeOut=300000L)
    void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(timeOut=300000L)
    void shutdown() throws Exception {
        super.shutdown();
    }

    @DataProvider(name="dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[][]{{DispatchRateType.messageRate}, {DispatchRateType.byteRate}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatorchange";
        String topicName = "persistent://pulsar/replicatorchange/ratechange";
        this.admin1.namespaces().createNamespace("pulsar/replicatorchange");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/replicatorchange", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic("persistent://pulsar/replicatorchange/ratechange").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            producer.close();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic("persistent://pulsar/replicatorchange/ratechange").get();
            Assert.assertFalse((boolean)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            int messageRate = 100;
            DispatchRate dispatchRateMsg = new DispatchRate(messageRate, -1L, 360);
            this.admin1.namespaces().setReplicatorDispatchRate("pulsar/replicatorchange", dispatchRateMsg);
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)messageRate);
            messageRate = 500;
            DispatchRate dispatchRateByte = new DispatchRate(-1, (long)messageRate, 360);
            this.admin1.namespaces().setReplicatorDispatchRate("pulsar/replicatorchange", dispatchRateByte);
            replicatorUpdated = false;
            for (int i = 0; i < retry; ++i) {
                if (((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte() == (long)messageRate) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((Object)this.admin1.namespaces().getReplicatorDispatchRate("pulsar/replicatorchange"), (Object)dispatchRateByte);
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="dispatchRateType", timeOut=5000L)
    public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatorbyteandmsg" + dispatchRateType.toString();
        String topicName = "persistent://" + namespace + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        int messageRate = 100;
        DispatchRate dispatchRate = DispatchRateType.messageRate.equals((Object)dispatchRateType) ? new DispatchRate(100, -1L, 360) : new DispatchRate(-1, 100L, 360);
        this.admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            if (DispatchRateType.messageRate.equals((Object)dispatchRateType)) {
                Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L);
            } else {
                Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)100L);
            }
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger totalReceived = new AtomicInteger(0);
                Consumer consumer = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub2-in-cluster2").messageListener((MessageListener & Serializable)(c1, msg) -> {
                    Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in the listener", (Object)receivedMessage);
                    totalReceived.incrementAndGet();
                }).subscribe();
                int numMessages = 500;
                for (int i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertTrue((totalReceived.get() < 200 ? 1 : 0) != 0);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=5000L)
    public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatormsg";
        String topicName = "persistent://pulsar/replicatormsg/notReceivedAll";
        this.admin1.namespaces().createNamespace("pulsar/replicatormsg");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/replicatormsg", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        int messageRate = 100;
        DispatchRate dispatchRate = new DispatchRate(100, -1L, 360);
        this.admin1.namespaces().setReplicatorDispatchRate("pulsar/replicatormsg", dispatchRate);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic("persistent://pulsar/replicatormsg/notReceivedAll").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic("persistent://pulsar/replicatormsg/notReceivedAll").get();
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                int i;
                AtomicInteger totalReceived = new AtomicInteger(0);
                Consumer consumer = client2.newConsumer().topic(new String[]{"persistent://pulsar/replicatormsg/notReceivedAll"}).subscriptionName("sub2-in-cluster2").messageListener((MessageListener & Serializable)(c1, msg) -> {
                    Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in the listener", (Object)receivedMessage);
                    totalReceived.incrementAndGet();
                }).subscribe();
                int numMessages = 50;
                for (i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertEquals((int)totalReceived.get(), (int)numMessages);
                numMessages = 200;
                for (i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertEquals((int)totalReceived.get(), (int)100);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    static enum DispatchRateType {
        messageRate,
        byteRate;

    }
}

