/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiMessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TopicsConsumerImplTest
extends ProducerConsumerBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testDifferentTopicsNameSubscribe() throws Exception {
        String key = "TopicsFromDifferentNamespace";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        try {
            this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe for topics from different namespace should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeOut=90000L)
    public void testGetConsumersAndGetTopics() throws Exception {
        String key = "TopicsConsumerGet";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).topic(new String[]{topicName3}).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        Assert.assertTrue((boolean)consumer.getTopic().startsWith("MultiTopicsConsumer-"));
        List topics = ((MultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        List consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        topics.forEach(topic -> log.info("topic: {}", topic));
        consumers.forEach(c -> log.info("consumer: {}", (Object)c.getTopic()));
        IntStream.range(0, 6).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        consumer.unsubscribe();
        consumer.close();
    }

    @Test(timeOut=90000L)
    public void testSyncProducerAndConsumer() throws Exception {
        String key = "TopicsConsumerSyncTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test(timeOut=90000L)
    public void testAsyncConsumer() throws Exception {
        String key = "TopicsConsumerAsyncTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            futures.add(producer1.sendAsync((Object)(messagePredicate + "producer1-" + i).getBytes()));
            futures.add(producer2.sendAsync((Object)(messagePredicate + "producer2-" + i).getBytes()));
            futures.add(producer3.sendAsync((Object)(messagePredicate + "producer3-" + i).getBytes()));
        }
        log.info("Waiting for async publish to complete : {}", (Object)futures.size());
        for (Future future : futures) {
            future.get();
        }
        log.info("start async consume");
        CountDownLatch latch = new CountDownLatch(30);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.execute(() -> IntStream.range(0, 30).forEach(index -> ((CompletableFuture)consumer.receiveAsync().thenAccept(msg -> {
            Assert.assertTrue((boolean)(msg instanceof TopicMessageImpl));
            try {
                consumer.acknowledge(msg);
            }
            catch (PulsarClientException e1) {
                Assert.fail((String)"message acknowledge failed", (Throwable)e1);
            }
            latch.countDown();
            log.info("receive index: {}, latch countDown: {}", (Object)index, (Object)latch.getCount());
        })).exceptionally(ex -> {
            log.warn("receive index: {}, failed receive message {}", (Object)index, (Object)ex.getMessage());
            ex.printStackTrace();
            return null;
        })));
        latch.await();
        log.info("success latch wait");
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test(timeOut=90000L)
    public void testConsumerUnackedRedelivery() throws Exception {
        String key = "TopicsConsumerRedeliveryTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            log.debug("Consumer received : " + new String(message.getData()));
            message = consumer.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)30L);
        message = consumer.receive();
        HashSet<String> hSet = new HashSet<String>();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            hSet.add(new String(message.getData()));
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertEquals((int)hSet.size(), (int)30);
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round2" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round2" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round2" + i).getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++received;
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertEquals((int)received, (int)30);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round3" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round3" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round3" + i).getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)30L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++redelivered;
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)30);
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test
    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
        String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        CompletableFuture unsubFuture = ((MultiTopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
        unsubFuture.get();
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round2" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round2" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round2" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)20);
        List topics = ((MultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        List consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)3);
        Assert.assertEquals((int)consumers.size(), (int)3);
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getTopics().size(), (int)2);
        CompletableFuture subFuture = ((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicName3);
        subFuture.get();
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round3" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round3" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round3" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        topics = ((MultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)6);
        Assert.assertEquals((int)consumers.size(), (int)6);
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test(timeOut=90000L)
    public void testTopicsNameSubscribeWithBuilderFail() throws Exception {
        String key = "TopicsNameSubscribeWithBuilder";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        try {
            this.pulsarClient.newConsumer().subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe1 with no topicName should fail.");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[0]).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe2 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topics(null).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe3 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topics((List)Lists.newArrayList()).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe4 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeOut=30000L)
    public void testMultiTopicsMessageListener() throws Exception {
        String key = "MultiTopicsMessageListenerTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 6;
        CountDownLatch latch = new CountDownLatch(18);
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName1, 2);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(1000L, TimeUnit.MILLISECONDS).receiverQueueSize(100).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            latch.countDown();
            log.info("Received message [{}] in the listener, latch: {}", (Object)receivedMessage, (Object)latch.getCount());
        }).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl)consumer;
        for (int i = 0; i < 6; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
        }
        latch.await();
        consumer.close();
    }

    @Test(timeOut=30000L)
    public void testTopicAutoUpdatePartitions() throws Exception {
        String key = "TestTopicAutoUpdatePartitions";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 6;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName1, 2);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).autoUpdatePartitions(true).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl)consumer;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1 + "-partition-2").enableBatching(false).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2 + "-partition-2").enableBatching(false).create();
        for (int i = 0; i < 6; ++i) {
            producer1.send((Object)(messagePredicate + "topic1-partition-2 index:" + i).getBytes());
            producer2.send((Object)(messagePredicate + "topic2-partition-2 index:" + i).getBytes());
            log.info("produce message to partition-2. message index: {}", (Object)i);
        }
        Message message = consumer.receive(200, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)message);
        this.admin.topics().updatePartitionedTopic(topicName1, 3);
        this.admin.topics().updatePartitionedTopic(topicName2, 3);
        log.info("trigger partitionsAutoUpdateTimerTask");
        Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout();
        timeout.task().run(timeout);
        Thread.sleep(200L);
        for (int i = 0; i < 6; ++i) {
            producer1.send((Object)(messagePredicate + "topic1-partition-2 index:" + i).getBytes());
            producer2.send((Object)(messagePredicate + "topic2-partition-2 index:" + i).getBytes());
            log.info("produce message to partition-2 again. messageindex: {}", (Object)i);
        }
        int messageSet = 0;
        message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("4 Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)12);
        consumer.close();
    }

    @Test(timeOut=90000L)
    public void testDefaultBacklogTTL() throws Exception {
        int defaultTTLSec = 1;
        int totalMessages = 10;
        this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec);
        String namespace = "prop/use/expiry";
        String topicName = "persistent://prop/use/expiry/expiry";
        String subName = "expiredSub";
        this.admin.clusters().createCluster("use", new ClusterData(this.brokerUrl.toString()));
        this.admin.tenants().createTenant("prop", new TenantInfo(null, (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("prop/use/expiry");
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/expiry/expiry"}).subscriptionName("expiredSub").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        consumer.close();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/expiry/expiry").enableBatching(false).create();
        for (int i = 0; i < totalMessages; ++i) {
            producer.send((Object)("" + i).getBytes());
        }
        Optional topic = (Optional)this.pulsar.getBrokerService().getTopic("persistent://prop/use/expiry/expiry", false).get();
        Assert.assertTrue((boolean)topic.isPresent());
        PersistentSubscription subscription = (PersistentSubscription)((Topic)topic.get()).getSubscription("expiredSub");
        Thread.sleep((defaultTTLSec + 5) * 1000);
        ((Topic)topic.get()).checkMessageExpiry();
        TopicsConsumerImplTest.retryStrategically(test -> subscription.getNumberOfEntriesInBacklog() == 0L, 5, 200L);
        Assert.assertEquals((long)subscription.getNumberOfEntriesInBacklog(), (long)0L);
    }

    @Test(timeOut=90000L)
    public void testGetLastMessageId() throws Exception {
        String key = "TopicGetLastMessageId";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 30; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        MessageId messageId = consumer.getLastMessageId();
        Assert.assertTrue((boolean)(messageId instanceof MultiMessageIdImpl));
        MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl)messageId;
        Map map = multiMessageId.getMap();
        Assert.assertEquals((int)map.size(), (int)6);
        map.forEach((k, v) -> {
            log.info("topic: {}, messageId:{} ", k, (Object)v.toString());
            Assert.assertTrue((boolean)(v instanceof MessageIdImpl));
            MessageIdImpl messageId1 = (MessageIdImpl)v;
            if (k.contains(topicName1)) {
                Assert.assertEquals((long)messageId1.entryId, (long)29L);
            } else if (k.contains(topicName2)) {
                Assert.assertEquals((long)messageId1.entryId, (long)14L);
            } else {
                Assert.assertEquals((long)messageId1.entryId, (long)9L);
            }
        });
        for (int i = 0; i < 30; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        messageId = consumer.getLastMessageId();
        Assert.assertTrue((boolean)(messageId instanceof MultiMessageIdImpl));
        MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl)messageId;
        Map map2 = multiMessageId2.getMap();
        Assert.assertEquals((int)map2.size(), (int)6);
        map2.forEach((k, v) -> {
            log.info("topic: {}, messageId:{} ", k, (Object)v.toString());
            Assert.assertTrue((boolean)(v instanceof MessageIdImpl));
            MessageIdImpl messageId1 = (MessageIdImpl)v;
            if (k.contains(topicName1)) {
                Assert.assertEquals((long)messageId1.entryId, (long)59L);
            } else if (k.contains(topicName2)) {
                Assert.assertEquals((long)messageId1.entryId, (long)29L);
            } else {
                Assert.assertEquals((long)messageId1.entryId, (long)19L);
            }
        });
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }
}

