/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ResendRequestTest
extends BrokerTestBase {
    private static final long testTimeout = 60000L;
    private static final Logger log = LoggerFactory.getLogger(ResendRequestTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=60000L)
    public void testExclusiveSingleAckedNormalTopic() throws Exception {
        int i;
        String key = "testExclusiveSingleAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        HashSet<MessageId> messageIdHashSet = new HashSet<MessageId>();
        HashSet<String> messageDataHashSet = new HashSet<String>();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscribe();
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = messagePredicate + i2;
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        log.info("Message received " + new String(message.getData()));
        for (i = 1; i < 10; ++i) {
            Message msg = consumer.receive();
            log.info("Message received " + new String(msg.getData()));
            messageDataHashSet.add(new String(msg.getData()));
        }
        this.printIncomingMessageQueue((Consumer<byte[]>)consumer);
        consumer.acknowledge(message);
        log.info("Message acked " + new String(message.getData()));
        messageIdHashSet.add(message.getMessageId());
        messageDataHashSet.add(new String(message.getData()));
        consumer.redeliverUnacknowledgedMessages();
        log.info("Resend Messages Request sent");
        for (i = 0; i < 9; ++i) {
            message = consumer.receive();
            log.info("Message received " + new String(message.getData()));
            if (i < 2) {
                messageIdHashSet.add(message.getMessageId());
                consumer.acknowledge(message);
            }
            log.info("Message acked " + new String(message.getData()));
            Assert.assertTrue((boolean)messageDataHashSet.contains(new String(message.getData())));
        }
        Assert.assertEquals((int)messageIdHashSet.size(), (int)3);
        Assert.assertEquals((int)messageDataHashSet.size(), (int)10);
        this.printIncomingMessageQueue((Consumer<byte[]>)consumer);
        consumer.redeliverUnacknowledgedMessages();
        log.info("Resend Messages Request sent");
        message = consumer.receive(2000, TimeUnit.MILLISECONDS);
        while (message != null) {
            log.info("Message received " + new String(message.getData()));
            consumer.acknowledge(message);
            log.info("Message acked " + new String(message.getData()));
            messageIdHashSet.add(message.getMessageId());
            messageDataHashSet.add(new String(message.getData()));
            message = consumer.receive(5000, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)messageIdHashSet.size(), (int)10);
        Assert.assertEquals((int)messageDataHashSet.size(), (int)10);
        this.printIncomingMessageQueue((Consumer<byte[]>)consumer);
        consumer.redeliverUnacknowledgedMessages();
        Assert.assertNull((Object)consumer.receive(2000, TimeUnit.MILLISECONDS));
        for (i = 0; i < 10; ++i) {
            Assert.assertTrue((boolean)messageDataHashSet.contains(messagePredicate + i));
        }
    }

    @Test(timeOut=60000L)
    public void testSharedSingleAckedNormalTopic() throws Exception {
        String key = "testSharedSingleAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-shared-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(5).subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(5).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        int receivedConsumer1 = 0;
        int receivedConsumer2 = 0;
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        do {
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
            }
            if (message2 != null) {
                log.info("Consumer 2 Received: " + new String(message2.getData()));
                ++receivedConsumer2;
            }
            message1 = consumer1.receive(100, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(100, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info("Consumer 1 receives = " + receivedConsumer1);
        log.info("Consumer 2 receives = " + receivedConsumer2);
        log.info("Total receives = " + (receivedConsumer2 + receivedConsumer1));
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)10);
        log.info("Consumer 1 sent a resend request");
        consumer1.redeliverUnacknowledgedMessages();
        int receivedMessagesAfterRedelivery = 0;
        receivedConsumer1 = 0;
        message1 = consumer1.receive(100, TimeUnit.MILLISECONDS);
        message2 = consumer2.receive(100, TimeUnit.MILLISECONDS);
        do {
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
                ++receivedMessagesAfterRedelivery;
            }
            if (message2 != null) {
                log.info("Consumer 2 Received: " + new String(message2.getData()));
                ++receivedConsumer2;
                ++receivedMessagesAfterRedelivery;
            }
            message1 = consumer1.receive(200, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(200, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info("Additional received = " + receivedMessagesAfterRedelivery);
        newPulsarClient.close();
        Assert.assertTrue((receivedMessagesAfterRedelivery > 0 ? 1 : 0) != 0);
        Assert.assertEquals((int)(receivedConsumer1 + receivedConsumer2), (int)10);
    }

    @Test(timeOut=60000L)
    public void testFailoverSingleAckedNormalTopic() throws Exception {
        Message message2;
        Message message1;
        String key = "testFailoverSingleAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-failover-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS);
        Consumer consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe();
        Consumer consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        int receivedConsumer1 = 0;
        int receivedConsumer2 = 0;
        do {
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
            }
            if (message2 == null) continue;
            log.info("Consumer 2 Received: " + new String(message2.getData()));
            ++receivedConsumer2;
        } while (message1 != null || message2 != null);
        log.info("Consumer 1 receives = " + receivedConsumer1);
        log.info("Consumer 2 receives = " + receivedConsumer2);
        log.info("Total receives = " + (receivedConsumer2 + receivedConsumer1));
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)10);
        Assert.assertEquals((int)receivedConsumer2, (int)0);
        consumer1.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        receivedConsumer2 = 0;
        receivedConsumer1 = 0;
        for (int i = 0; i < 5; ++i) {
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
                log.info("Consumer 1 Acknowledged: " + new String(message1.getData()));
                consumer1.acknowledge(message1);
            }
            if (message2 == null) continue;
            log.info("Consumer 2 Received: " + new String(message2.getData()));
            ++receivedConsumer2;
        }
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)5);
        Assert.assertEquals((int)receivedConsumer2, (int)0);
        consumer1.close();
        message2 = consumer2.receive();
        int acknowledgedMessages = 0;
        int unAcknowledgedMessages = 0;
        boolean flag = true;
        do {
            if (flag) {
                consumer2.acknowledge(message2);
                ++acknowledgedMessages;
                log.info("Consumer 2 Acknowledged: " + new String(message2.getData()));
            } else {
                ++unAcknowledgedMessages;
            }
            flag = !flag;
            log.info("Consumer 2 Received: " + new String(message2.getData()));
        } while ((message2 = consumer2.receive(500, TimeUnit.MILLISECONDS)) != null);
        log.info("Consumer 2 receives = " + (unAcknowledgedMessages + acknowledgedMessages));
        log.info("Consumer 2 acknowledges = " + acknowledgedMessages);
        Assert.assertEquals((int)(unAcknowledgedMessages + acknowledgedMessages), (int)(10 - receivedConsumer1));
        consumer2.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        message2 = consumer2.receive();
        receivedConsumer2 = 0;
        do {
            ++receivedConsumer2;
        } while ((message2 = consumer2.receive(500, TimeUnit.MILLISECONDS)) != null);
        log.info("Consumer 2 receives = " + receivedConsumer2);
        Assert.assertEquals((int)unAcknowledgedMessages, (int)receivedConsumer2);
    }

    @Test(timeOut=60000L)
    public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
        String key = "testExclusiveCumulativeAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        log.info("Message received " + new String(message.getData()));
        for (int i = 0; i < 7; ++i) {
            this.printIncomingMessageQueue((Consumer<byte[]>)consumer);
            message = consumer.receive();
            log.info("Message received " + new String(message.getData()));
        }
        consumer.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        consumer.acknowledgeCumulative(message);
        while ((message = consumer.receive(1000, TimeUnit.MILLISECONDS)) != null) {
        }
        log.info("Consumer Requests Messages");
        consumer.redeliverUnacknowledgedMessages();
        int numOfReceives = 0;
        message = consumer.receive();
        do {
            ++numOfReceives;
            log.info("Message received " + new String(message.getData()));
        } while ((message = consumer.receive(1000, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)numOfReceives, (int)2);
    }

    @Test(timeOut=60000L)
    public void testExclusiveSingleAckedPartitionedTopic() throws Exception {
        String key = "testExclusiveSingleAckedPartitionedTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        int numberOfPartitions = 4;
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName, 4);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            log.info("Message produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        int messageCount = 0;
        log.info("Message received " + new String(message.getData()));
        do {
            ++messageCount;
            log.info("Message received " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageCount, (int)10);
        consumer.redeliverUnacknowledgedMessages();
        message = consumer.receive();
        messageCount = 0;
        log.info("Message received " + new String(message.getData()));
        do {
            ++messageCount;
            log.info("Message received " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageCount, (int)10);
    }

    @Test(timeOut=60000L)
    public void testSharedSingleAckedPartitionedTopic() throws Exception {
        String key = "testSharedSingleAckedPartitionedTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-shared-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        int numberOfPartitions = 3;
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Random rn = new Random();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            log.info("Message produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        int messageCount1 = 0;
        int messageCount2 = 0;
        int ackCount1 = 0;
        int ackCount2 = 0;
        do {
            if (message1 != null) {
                log.info("Consumer1 received " + new String(message1.getData()));
                ++messageCount1;
                if (rn.nextInt() % 3 == 0) {
                    consumer1.acknowledge(message1);
                    log.info("Consumer1 acked " + new String(message1.getData()));
                    ++ackCount1;
                }
            }
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
                if (rn.nextInt() % 3 == 0) {
                    consumer2.acknowledge(message2);
                    log.info("Consumer2 acked " + new String(message2.getData()));
                    ++ackCount2;
                }
            }
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount1 + messageCount2), (int)10);
        log.info(key + ": Sent a Redeliver Message Request");
        consumer1.redeliverUnacknowledgedMessages();
        if (ackCount1 + ackCount2 == 10) {
            return;
        }
        message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
        message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
        messageCount1 = 0;
        do {
            if (message1 != null) {
                log.info("Consumer1 received " + new String(message1.getData()));
                ++messageCount1;
            }
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
            }
            message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        newPulsarClient.close();
        Assert.assertEquals((int)(messageCount1 + messageCount2 + ackCount1), (int)10);
    }

    @Test(timeOut=60000L)
    public void testFailoverSingleAckedPartitionedTopic() throws Exception {
        String key = "testFailoverSingleAckedPartitionedTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-failover-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        int numberOfPartitions = 3;
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Random rn = new Random();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Failover);
        Consumer consumer1 = consumerBuilder.clone().consumerName("Consumer-1").subscribe();
        Consumer consumer2 = consumerBuilder.clone().consumerName("Consumer-2").subscribe();
        Thread.sleep(1000L);
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            log.info("Message produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        int messageCount1 = 0;
        int messageCount2 = 0;
        int ackCount1 = 0;
        int ackCount2 = 0;
        do {
            if (message1 != null) {
                log.info("Consumer1 received " + new String(message1.getData()));
                ++messageCount1;
                if (rn.nextInt() % 3 == 0) {
                    consumer1.acknowledge(message1);
                    ++ackCount1;
                }
            }
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
                if (rn.nextInt() % 3 == 0) {
                    consumer2.acknowledge(message2);
                    ++ackCount2;
                }
            }
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount1 + messageCount2), (int)10);
        if (ackCount1 + ackCount2 == 10) {
            return;
        }
        log.info(key + ": Sent a Redeliver Message Request");
        consumer1.redeliverUnacknowledgedMessages();
        consumer1.close();
        message2 = consumer2.receive();
        messageCount1 = 0;
        do {
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
            }
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount2 + ackCount1), (int)10);
    }

    @Test(timeOut=60000L)
    public void testFailoverInactiveConsumer() throws Exception {
        Message message1;
        String key = "testFailoverInactiveConsumer";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-failover-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover);
        Consumer consumer1 = consumerBuilder.clone().consumerName("Consumer-1").subscribe();
        Consumer consumer2 = consumerBuilder.clone().consumerName("Consumer-2").subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        int receivedConsumer1 = 0;
        int receivedConsumer2 = 0;
        do {
            if ((message1 = consumer1.receive(500, TimeUnit.MILLISECONDS)) == null) continue;
            log.info("Consumer 1 Received: " + new String(message1.getData()));
            ++receivedConsumer1;
        } while (message1 != null);
        log.info("Consumer 1 receives = " + receivedConsumer1);
        log.info("Consumer 2 receives = " + receivedConsumer2);
        log.info("Total receives = " + (receivedConsumer2 + receivedConsumer1));
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)10);
        Assert.assertEquals((int)receivedConsumer2, (int)0);
        log.info("Consumer 2 asks for resend");
        consumer2.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
        Message message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)message1);
        Assert.assertNull((Object)message2);
    }

    private BlockingQueue<Message<byte[]>> printIncomingMessageQueue(Consumer<byte[]> consumer) throws Exception {
        BlockingQueue imq = null;
        ConsumerBase c = (ConsumerBase)consumer;
        Field field = ConsumerBase.class.getDeclaredField("incomingMessages");
        field.setAccessible(true);
        imq = (BlockingQueue)field.get(c);
        log.info("Incoming MEssage Queue: {}", (Object)imq);
        return imq;
    }
}

