/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class SimpleProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPublishTimestampBatchDisabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        final AtomicLong ticker = new AtomicLong(0L);
        Clock clock = new Clock(){

            @Override
            public ZoneId getZone() {
                return ZoneId.systemDefault();
            }

            @Override
            public Clock withZone(ZoneId zone) {
                return this;
            }

            @Override
            public Instant instant() {
                return Instant.ofEpochMilli(this.millis());
            }

            @Override
            public long millis() {
                return ticker.incrementAndGet();
            }
        };
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).clock(clock).build();
        try {
            String topic = "persistent://my-property/my-ns/test-publish-timestamp";
            Consumer consumer = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-publish-timestamp"}).subscriptionName("my-sub").subscribe();
            try {
                Producer producer = newPulsarClient.newProducer().topic("persistent://my-property/my-ns/test-publish-timestamp").enableBatching(false).create();
                try {
                    int i;
                    int numMessages = 5;
                    for (i = 0; i < 5; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    for (i = 0; i < 5; ++i) {
                        Message msg = consumer.receive();
                        log.info("Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        Assert.assertEquals((long)(1L + (long)i), (long)msg.getPublishTime());
                        Assert.assertEquals((long)(100L * (long)(i + 1)), (long)msg.getEventTime());
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPublishTimestampBatchEnabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        final AtomicLong ticker = new AtomicLong(0L);
        Clock clock = new Clock(){

            @Override
            public ZoneId getZone() {
                return ZoneId.systemDefault();
            }

            @Override
            public Clock withZone(ZoneId zone) {
                return this;
            }

            @Override
            public Instant instant() {
                return Instant.ofEpochMilli(this.millis());
            }

            @Override
            public long millis() {
                return ticker.incrementAndGet();
            }
        };
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).clock(clock).build();
        try {
            String topic = "persistent://my-property/my-ns/test-publish-timestamp";
            Consumer consumer = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-publish-timestamp"}).subscriptionName("my-sub").subscribe();
            try {
                int numMessages = 5;
                Producer producer = newPulsarClient.newProducer().topic("persistent://my-property/my-ns/test-publish-timestamp").enableBatching(true).batchingMaxMessages(50).create();
                try {
                    int i;
                    for (i = 0; i < 5; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    for (i = 0; i < 5; ++i) {
                        Message msg = consumer.receive();
                        log.info("Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        Assert.assertEquals((long)1L, (long)msg.getPublishTime());
                        Assert.assertEquals((long)(100L * (long)(i + 1)), (long)msg.getEventTime());
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @DataProvider(name="batch")
    public Object[][] codecProvider() {
        return new Object[][]{{0}, {1000}};
    }

    @Test(dataProvider="batch")
    public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        CompletableFuture ackFuture = consumer.acknowledgeCumulativeAsync(msg);
        log.info("Waiting for async ack to complete");
        ackFuture.get();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch", timeOut=100000L)
    public void testMessageListener(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numMessages = 100;
        CountDownLatch latch = new CountDownLatch(numMessages);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic3"}).subscriptionName("my-subscriber-name").messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c1.acknowledgeAsync(msg);
            latch.countDown();
        }).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic3");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=100000L)
    public void testPauseAndResume() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int receiverQueueSize = 20;
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(receiverQueueSize));
        AtomicInteger received = new AtomicInteger();
        Consumer consumer = this.pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize).topic(new String[]{"persistent://my-property/my-ns/my-topic-pr"}).subscriptionName("my-subscriber-name").messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c1.acknowledgeAsync(msg);
            received.incrementAndGet();
            ((CountDownLatch)latch.get()).countDown();
        }).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-pr").create();
        consumer.pause();
        for (int i = 0; i < receiverQueueSize * 2; ++i) {
            producer.send((Object)("my-message-" + i).getBytes());
        }
        log.info("Waiting for message listener to ack " + receiverQueueSize + " messages");
        Assert.assertTrue((boolean)latch.get().await(receiverQueueSize, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        log.info("Giving message listener an opportunity to receive messages while paused");
        Thread.sleep(2000L);
        Assert.assertEquals((int)received.intValue(), (int)receiverQueueSize, (String)"Consumer received messages while paused");
        latch.set(new CountDownLatch(receiverQueueSize));
        consumer.resume();
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.get().await(receiverQueueSize, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic4"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic4");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.sendAsync((Object)("my-message-" + i2).getBytes()).thenApply(msgId -> {
                log.info("Published message id: {}", msgId);
                return msgId;
            });
        }
        producer.flush();
        Message msg = null;
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive();
            log.info("Received: [{}]", (Object)new String(msg.getData()));
        }
        log.info("-- Restarting broker --");
        this.restartBroker();
        msg = null;
        log.info("Receiving duplicate messages..");
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive();
            log.info("Received: [{}]", (Object)new String(msg.getData()));
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testSendTimeout(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic5"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        String message = "my-message";
        this.stopBroker();
        CompletableFuture future = producer.sendAsync((Object)"my-message".getBytes());
        try {
            future.get();
            Assert.fail((String)"Send operation should have failed");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        this.startBroker();
        Message msg = consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient client1 = PulsarClient.builder().serviceUrl("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT).build();
        client1.close();
        try {
            client1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        try {
            client1.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscribe();
        try {
            TypedMessageBuilder builder = producer.newMessage().value((Object)"InvalidMessage".getBytes());
            Message msg = ((TypedMessageBuilderImpl)builder).getMessage();
            consumer.acknowledge(msg);
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        consumer.close();
        try {
            consumer.receive();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            consumer.unsubscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        producer.close();
        try {
            producer.send((Object)"message".getBytes());
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
    }

    @Test
    public void testSillyUser() {
        try {
            PulsarClient.builder().serviceUrl("invalid://url").build();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidServiceURL));
        }
        try {
            this.pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer().maxPendingMessages(0);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer().topic("invalid://topic").create();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidTopicNameException));
        }
        try {
            this.pulsarClient.newConsumer().messageListener(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().subscriptionType(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().receiverQueueSize(-1);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName(null).subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException | PulsarClientException e) {
            Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName("").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException | PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof IllegalArgumentException));
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"invalid://topic7"}).subscriptionName("my-subscriber-name").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidTopicNameException));
        }
    }

    @Test(dataProvider="batch")
    public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
        Message msg;
        int recvQueueSize = 100;
        int numConsumersThreads = 10;
        String subName = UUID.randomUUID().toString();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName(subName).receiverQueueSize(100).subscribe();
        ExecutorService executor = Executors.newCachedThreadPool();
        CyclicBarrier barrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; ++i) {
            executor.submit(() -> {
                barrier.await();
                consumer.receive();
                return null;
            });
        }
        barrier.await();
        Thread.sleep(100L);
        this.restartBroker();
        Thread.sleep(2000L);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic7");
        if (batchMessageDelayMs != 0) {
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
            producerBuilder.enableBatching(true);
        }
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        ConsumerImpl consumerImpl = (ConsumerImpl)consumer;
        Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10);
        Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
        barrier.reset();
        for (int i = 0; i < 10; ++i) {
            executor.submit(() -> {
                barrier.await();
                consumer.receive();
                return null;
            });
        }
        barrier.await();
        Thread.sleep(100L);
        Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)20);
        Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)80);
        while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
        }
        Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)0);
        Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)0);
        barrier.reset();
        for (int i = 0; i < 10; ++i) {
            executor.submit(() -> {
                barrier.await();
                consumer.receive();
                return null;
            });
        }
        barrier.await();
        Thread.sleep(100L);
        this.restartBroker();
        Thread.sleep(2000L);
        Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10);
        Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
        consumer.close();
    }

    @Test
    public void testSendBigMessageSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/bigMsg";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").create();
        producer.newMessage().value((Object)new byte[0x500000]);
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
    }

    @Test
    public void testSendBigMessageSizeButCompressed() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/bigMsg";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        producer.send((Object)new byte[0x500001]);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        producer.send((Object)new byte[0x500001]);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.NONE).create();
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/bigMsg"}).subscriptionName("sub1").subscribe();
        byte[] content = new byte[0x50000A];
        producer.send((Object)content);
        Assert.assertEquals((byte[])consumer.receive().getData(), (byte[])content);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.NONE).create();
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        producer.close();
        consumer.close();
    }

    @Test
    public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
        int i;
        int i2;
        log.info("-- Starting {} test --", (Object)this.methodName);
        long batchMessageDelayMs = 100L;
        int receiverSize = 10;
        String topicName = "cache-topic";
        String sub1 = "faster-sub1";
        String sub2 = "slower-sub2";
        Consumer subscriber1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("faster-sub1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        String topic = "persistent://my-property/my-ns/cache-topic";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/cache-topic");
        producerBuilder.batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS);
        producerBuilder.batchingMaxMessages(5);
        producerBuilder.enableBatching(true);
        Producer producer = producerBuilder.create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/cache-topic").get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(cacheField, cacheField.getModifiers() & 0xFFFFFFEF);
        EntryCacheImpl entryCache = (EntryCacheImpl)Mockito.spy((Object)((EntryCacheImpl)cacheField.get(ledger)));
        cacheField.set(ledger, entryCache);
        Message msg = null;
        for (i2 = 0; i2 < 30; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        for (i2 = 0; i2 < 30; ++i2) {
            msg = subscriber1.receive(5, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        ((EntryCacheImpl)Mockito.verify((Object)entryCache, (VerificationMode)Mockito.atLeastOnce())).invalidateEntries((PositionImpl)Mockito.any());
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(5, TimeUnit.SECONDS);
        Consumer subscriber2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("slower-sub2").subscribe();
        int moreMessages = 10;
        for (i = 0; i < 20; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 20; ++i) {
            msg = subscriber1.receive(5, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(5, TimeUnit.SECONDS);
        Assert.assertTrue((entryCache.getSize() != 0L ? 1 : 0) != 0);
        subscriber2.close();
        SimpleProducerConsumerTest.retryStrategically(test -> entryCache.getSize() == 0L, 5, 100L);
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
        subscriber1.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testDeactivatingBacklogConsumer() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        long batchMessageDelayMs = 100L;
        int receiverSize = 10;
        String topicName = "cache-topic";
        String topic = "persistent://my-property/my-ns/cache-topic";
        String sub1 = "faster-sub1";
        String sub2 = "slower-sub2";
        Consumer subscriber1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("faster-sub1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        Consumer subscriber2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("slower-sub2").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/cache-topic");
        producerBuilder.enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        Producer producer = producerBuilder.create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/cache-topic").get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        long maxMessageCacheRetentionTimeMillis = this.conf.getManagedLedgerCacheEvictionTimeThresholdMillis();
        long maxActiveCursorBacklogEntries = this.conf.getManagedLedgerCursorBackloggedThreshold();
        Message msg = null;
        int totalMsgs = (int)maxActiveCursorBacklogEntries + 10 + 1;
        for (i = 0; i < totalMsgs; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < totalMsgs; ++i) {
            msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
            subscriber1.acknowledge(msg);
        }
        Thread.sleep(maxMessageCacheRetentionTimeMillis);
        ledger.checkBackloggedCursors();
        Thread.sleep(100L);
        HashSet activeSubscriber = Sets.newHashSet();
        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
        Assert.assertTrue((boolean)activeSubscriber.contains("faster-sub1"));
        Assert.assertFalse((boolean)activeSubscriber.contains("slower-sub2"));
        for (int i2 = 0; i2 < totalMsgs; ++i2) {
            msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
            subscriber2.acknowledge(msg);
        }
        ledger.checkBackloggedCursors();
        activeSubscriber.clear();
        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
        Assert.assertTrue((boolean)activeSubscriber.contains("faster-sub1"));
        Assert.assertTrue((boolean)activeSubscriber.contains("slower-sub2"));
    }

    @Test(timeOut=2000L)
    public void testAsyncProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
        latch.await();
        Assert.assertEquals((int)produceMsgs.size(), (int)100);
        produceMsgs.removeAll(consumeMsgs);
        Assert.assertTrue((boolean)produceMsgs.isEmpty());
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=2000L)
    public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
        latch.await();
        Assert.assertEquals((int)produceMsgs.size(), (int)100);
        produceMsgs.removeAll(consumeMsgs);
        Assert.assertTrue((boolean)produceMsgs.isEmpty());
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testSendCallBack() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            int len = message.getBytes().length;
            AtomicInteger msgLength = new AtomicInteger();
            CompletionStage future = producer.sendAsync((Object)message.getBytes()).handle((r, ex) -> {
                if (ex != null) {
                    log.error("Message send failed:", ex);
                } else {
                    msgLength.set(len);
                }
                return null;
            });
            ((CompletableFuture)future).get();
            Assert.assertEquals((int)message.getBytes().length, (int)msgLength.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testSharedConsumerAckDifferentConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS);
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet consumerMsgSet1 = Sets.newHashSet();
        HashSet consumerMsgSet2 = Sets.newHashSet();
        for (int i = 0; i < 5; ++i) {
            msg = consumer1.receive();
            consumerMsgSet1.add(msg);
            msg = consumer2.receive();
            consumerMsgSet2.add(msg);
        }
        consumerMsgSet1.stream().forEach(m -> {
            try {
                consumer2.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumerMsgSet2.stream().forEach(m -> {
            try {
                consumer1.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumer1.redeliverUnacknowledgedMessages();
        consumer2.redeliverUnacknowledgedMessages();
        try {
            if (consumer1.receive(100, TimeUnit.MILLISECONDS) != null || consumer2.receive(100, TimeUnit.MILLISECONDS) != null) {
                Assert.fail();
            }
        }
        finally {
            consumer1.close();
            consumer2.close();
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch, Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
        if (currentMessage < totalMessage) {
            CompletableFuture future = consumer.receiveAsync();
            future.handle((msg, exception) -> {
                if (exception == null) {
                    consumeMsg.add(new String(msg.getData()));
                    try {
                        consumer.acknowledge(msg);
                    }
                    catch (PulsarClientException e1) {
                        Assert.fail((String)"message acknowledge failed", (Throwable)e1);
                    }
                    executor.execute(() -> {
                        try {
                            this.receiveAsync(consumer, totalMessage, currentMessage + 1, latch, consumeMsg, executor);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"message receive failed", (Throwable)e);
                        }
                    });
                    latch.countDown();
                }
                return null;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBlockingWithUnAckedMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 600;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 600; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 600 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)500);
            messages.forEach(m -> {
                try {
                    consumer.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"ack failed", (Throwable)e);
                }
            });
            int remainingMessages = 600 - messages.size();
            for (int i = 0; i < remainingMessages; ++i) {
                msg = consumer.receive(1, TimeUnit.SECONDS);
                if (msg == null) continue;
                messages.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)600, (int)messages.size());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 1500;
            int totalReceiveIteration = 3;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 1500; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            int totalReceivedMessages = 0;
            for (int j = 0; j < 3; ++j) {
                Message msg = null;
                ArrayList messages = Lists.newArrayList();
                for (int i = 0; i < 1500 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)500);
                messages.forEach(m -> {
                    try {
                        consumer.acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"ack failed", (Throwable)e);
                    }
                });
                totalReceivedMessages += messages.size();
            }
            Assert.assertEquals((int)totalReceivedMessages, (int)1500);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)20);
            messages.clear();
            for (i = 0; i < 80 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)20);
            messages.forEach(m -> {
                try {
                    consumer2.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            messages.clear();
            for (i = 0; i < 60 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                consumer2.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            consumer2.close();
            newPulsarClient.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            Message msg;
            int i;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 100;
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).ackTimeout(1L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Thread.sleep(1000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            Thread.sleep(2000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            for (i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnackBlockRedeliverMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            int unAckedMessagesBufferSize = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            consumer.redeliverUnacknowledgedMessages();
            Thread.sleep(1000L);
            int alreadyConsumedMessages = messages.size();
            messages.clear();
            for (int i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)(100 + alreadyConsumedMessages), (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="batch")
    public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            ProducerBuilder producerBuidler = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic");
            if (batchMessageDelayMs != 0) {
                producerBuidler.enableBatching(true);
                producerBuidler.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
                producerBuidler.batchingMaxMessages(5);
            } else {
                producerBuidler.enableBatching(false);
            }
            Producer producer = producerBuidler.create();
            ArrayList futures = Lists.newArrayList();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                futures.add(producer.sendAsync((Object)message.getBytes()));
            }
            FutureUtil.waitForAll((List)futures).get();
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertNotEquals((Object)messages.size(), (Object)100);
            messages.forEach(m -> {
                try {
                    consumer1.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            messages.clear();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                consumer1.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            Consumer consumer1 = consumerBuilder.subscribe();
            Consumer consumer2 = consumerBuilder.subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)20);
            messages.forEach(m -> {
                try {
                    consumer2.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                consumer2.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            for (i = 0; i < 100 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            consumer2.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    @Test
    public void testEnabledChecksumClient() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        int batchMessageDelayMs = 300;
        producerBuilder.enableBatching(true).batchingMaxPublishDelay(300L, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 20;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 20; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            Message msg = null;
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 20 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)10);
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 20 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 50;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            consumer.close();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 50; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            Message msg = null;
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 50 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)20);
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 50 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    @Test
    public void testPriorityConsumer() throws Exception {
        int i;
        CompletableFuture future;
        String message;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
        PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer3 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
        PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer4 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(2).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2").create();
        ArrayList futures = Lists.newArrayList();
        for (int i2 = 0; i2 < 15; ++i2) {
            message = "my-message-" + i2;
            future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future2 : futures) {
            future2.get();
        }
        for (i = 0; i < 20; ++i) {
            consumer1.receive(100, TimeUnit.MILLISECONDS);
            consumer2.receive(100, TimeUnit.MILLISECONDS);
        }
        for (i = 0; i < 5; ++i) {
            message = "my-message-" + i;
            future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        Assert.assertNull((Object)consumer4.receive(100, TimeUnit.MILLISECONDS));
        producer.close();
        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        newPulsarClient.close();
        newPulsarClient1.close();
        newPulsarClient2.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=5000L)
    public void testSharedSamePriorityConsumer() throws Exception {
        Message msg;
        int i;
        Message msg2;
        int i2;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int queueSize = 5;
        int maxUnAckMsgs = this.pulsar.getConfiguration().getMaxConcurrentLookupRequest();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(5);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer c1 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer c2 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        ArrayList futures = Lists.newArrayList();
        int totalPublishMessages = 500;
        for (int i3 = 0; i3 < 500; ++i3) {
            String message = "my-message-" + i3;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        ArrayList messages = Lists.newArrayList();
        for (i2 = 0; i2 < 500 && (msg2 = c1.receive(500, TimeUnit.MILLISECONDS)) != null; ++i2) {
            messages.add(msg2);
        }
        for (i2 = 0; i2 < 500 && (msg2 = c2.receive(500, TimeUnit.MILLISECONDS)) != null; ++i2) {
            messages.add(msg2);
        }
        Assert.assertEquals((int)10, (int)messages.size());
        PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer c3 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        PulsarClient newPulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer c4 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        PulsarClient newPulsarClient4 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer c5 = newPulsarClient4.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (i = 0; i < 500 && (msg = c4.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messages.add(msg);
        }
        for (i = 0; i < 500 && (msg = c5.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messages.add(msg);
        }
        for (i = 0; i < 500 && (msg = c3.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messages.add(msg);
            c3.acknowledge(msg);
        }
        Assert.assertEquals((int)messages.size(), (int)500);
        producer.close();
        c1.close();
        c2.close();
        c3.close();
        c4.close();
        c5.close();
        newPulsarClient.close();
        newPulsarClient1.close();
        newPulsarClient2.close();
        newPulsarClient3.close();
        newPulsarClient4.close();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testRedeliveryFailOverConsumer() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int receiverQueueSize = 10;
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
        int consumeMsgInParts = 4;
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        ArrayList messages1 = Lists.newArrayList();
        for (i = 0; i < consumeMsgInParts && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        messages1.clear();
        for (i = 0; i < consumeMsgInParts && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(100L);
        }
        int remainingMsgs = 20 - 2 * consumeMsgInParts;
        messages1.clear();
        for (int i3 = 0; i3 < remainingMsgs && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i3) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)remainingMsgs);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=5000L)
    public void testFailReceiveAsyncOnConsumerClose() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/failAsyncReceive-1"}).subscriptionName("my-subscriber-name").subscribe();
        consumer.close();
        try {
            consumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/failAsyncReceive-2");
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Consumer partitionedConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        partitionedConsumer.close();
        try {
            partitionedConsumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(groups={"encryption"})
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/myecdsa-topic1"}).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(groups={"encryption"})
    public void testRSAEncryption() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer2.send((Object)message.getBytes());
        }
        MessageImpl msg = null;
        for (int i2 = 0; i2 < 20; ++i2) {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i2;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(groups={"encryption"})
    public void testRedeliveryOfFailedMessages() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String encryptionKeyName = "client-rsa.pem";
        String encryptionKeyVersion = "1.0";
        final HashMap metadata = Maps.newHashMap();
        metadata.put("version", "1.0");
        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Producer producer = this.pulsarClient.newProducer().topic(topicName).addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        class InvalidKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            InvalidKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
                return null;
            }
        }
        Consumer consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new InvalidKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        int numberOfMessages = 100;
        String message = "my-message";
        HashSet<String> messages = new HashSet<String>();
        for (int i2 = 0; i2 < numberOfMessages; ++i2) {
            producer.send((Object)(message + i2).getBytes());
        }
        Message m = consumer2.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)m);
        m = consumer3.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)m);
        for (i = 0; i < numberOfMessages; ++i) {
            m = consumer1.receive();
            messages.add(new String(m.getData()));
            consumer1.acknowledge(m);
        }
        m = consumer2.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)m);
        m = consumer3.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)m);
        for (i = 0; i < numberOfMessages; ++i) {
            Assert.assertTrue((boolean)messages.contains(message + i));
        }
        consumer1.close();
        consumer2.close();
        consumer3.close();
        newPulsarClient.close();
        newPulsarClient1.close();
        newPulsarClient2.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(groups={"encryption"})
    public void testEncryptionFailure() throws Exception {
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }
        }
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        MessageImpl msg = null;
        HashSet messageSet = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
            Assert.fail((String)"Producer creation should not suceed if failing to read key");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Receive should have failed with no keyreader");
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        int msgNum = 0;
        try {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + msgNum++;
            Assert.assertNotEquals((Object)receivedMessage, (Object)expectedMessage, (String)("Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage));
            consumer.acknowledgeCumulative((Message)msg);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
        }
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (int i = msgNum; i < 9; ++i) {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative((Message)msg);
        consumer.close();
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(groups={"encryption"})
    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String encryptionKeyName = "client-rsa.pem";
        String encryptionKeyVersion = "1.0";
        final HashMap metadata = Maps.newHashMap();
        metadata.put("version", "1.0");
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1").subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        String message = "my-message";
        producer.send((Object)message.getBytes());
        TopicMessageImpl msg = (TopicMessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        String receivedMessage = this.decryptMessage((TopicMessageImpl<byte[]>)msg, "client-rsa.pem", new EncKeyReader());
        Assert.assertEquals((String)message, (String)receivedMessage);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception {
        Optional ctx = msg.getEncryptionCtx();
        Assert.assertTrue((boolean)ctx.isPresent());
        EncryptionContext encryptionCtx = (EncryptionContext)ctx.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
        Map keys = encryptionCtx.getKeys();
        Assert.assertEquals((int)keys.size(), (int)1);
        EncryptionContext.EncryptionKey encryptionKey = (EncryptionContext.EncryptionKey)keys.get(encryptionKeyName);
        byte[] dataKey = encryptionKey.getKeyValue();
        Map metadata = encryptionKey.getMetadata();
        String version = (String)metadata.get("version");
        Assert.assertEquals((String)version, (String)"1.0");
        CompressionType compressionType = encryptionCtx.getCompressionType();
        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
        byte[] encrParam = encryptionCtx.getParam();
        String encAlgo = encryptionCtx.getAlgorithm();
        int batchSize = encryptionCtx.getBatchSize().orElse(0);
        ByteBuf payloadBuf = Unpooled.wrappedBuffer((byte[])msg.getData());
        MessageCrypto crypto = new MessageCrypto("test", false);
        PulsarApi.MessageMetadata.Builder metadataBuilder = PulsarApi.MessageMetadata.newBuilder();
        PulsarApi.EncryptionKeys.Builder encKeyBuilder = PulsarApi.EncryptionKeys.newBuilder();
        encKeyBuilder.setKey(encryptionKeyName);
        ByteString keyValue = ByteString.copyFrom((byte[])dataKey);
        encKeyBuilder.setValue(keyValue);
        PulsarApi.EncryptionKeys encKey = encKeyBuilder.build();
        metadataBuilder.setEncryptionParam(ByteString.copyFrom((byte[])encrParam));
        metadataBuilder.setEncryptionAlgo(encAlgo);
        metadataBuilder.setProducerName("test");
        metadataBuilder.setSequenceId(123L);
        metadataBuilder.setPublishTime(12333453454L);
        metadataBuilder.addEncryptionKeys(encKey);
        metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol((CompressionType)compressionType));
        metadataBuilder.setUncompressedSize(uncompressedSize);
        ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(), payloadBuf, reader);
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((CompressionType)compressionType);
        ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
        if (batchSize > 0) {
            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
            uncompressedPayload = Commands.deSerializeSingleMessageInBatch((ByteBuf)uncompressedPayload, (PulsarApi.SingleMessageMetadata.Builder)singleMessageMetadataBuilder, (int)0, (int)batchSize);
        }
        byte[] data = new byte[uncompressedPayload.readableBytes()];
        uncompressedPayload.readBytes(data);
        uncompressedPayload.release();
        return new String(data);
    }

    @Test
    public void testConsumerSubscriptionInitialize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/test-subscription-initialize-topic";
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        for (int i = 0; i < 5; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Consumer defaultConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
        Consumer latestConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
        Consumer earliestConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i = 5; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Assert.assertEquals((byte[])defaultConsumer.receive().getData(), (byte[])"my-message-5".getBytes());
        Assert.assertEquals((byte[])latestConsumer.receive().getData(), (byte[])"my-message-5".getBytes());
        Assert.assertEquals((byte[])earliestConsumer.receive().getData(), (byte[])"my-message-0".getBytes());
        defaultConsumer.close();
        latestConsumer.close();
        earliestConsumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testFlushBatchEnabled() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-flush-enabled"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-flush-enabled").enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.HOURS).batchingMaxMessages(10000);
        try (Producer producer = producerBuilder.create();){
            for (i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.sendAsync((Object)message.getBytes());
            }
            producer.flush();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testFlushBatchDisabled() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-flush-disabled"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-flush-disabled").enableBatching(false);
        try (Producer producer = producerBuilder.create();){
            for (i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.sendAsync((Object)message.getBytes());
            }
            producer.flush();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testReachedEndOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/testReachedEndOfTopic";
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
        producer.close();
        this.admin.topics().terminateTopicAsync(topicName).get();
        final CountDownLatch latch = new CountDownLatch(2);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name").messageListener(new MessageListener(){

            public void reachedEndOfTopic(Consumer consumer) {
                log.info("called reachedEndOfTopic  {}", (Object)SimpleProducerConsumerTest.this.methodName);
                latch.countDown();
            }

            public void received(Consumer consumer, Message message) {
            }
        }).subscribe();
        Assert.assertFalse((boolean)latch.await(1L, TimeUnit.SECONDS));
        Assert.assertEquals((long)latch.getCount(), (long)1L);
        consumer.close();
    }

    @Test
    public void testFailOverConsumerPriority() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/priority-topic";
        String subscriptionName = "my-sub";
        int noOfPartitions = 9;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/priority-topic", 9);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/priority-topic"}).subscriptionName("my-sub").consumerName("aaa").subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).priorityLevel(1).subscribe();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/priority-topic"}).subscriptionName("my-sub").consumerName("bbb1").subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).priorityLevel(1);
        Consumer consumer2 = consumerBuilder.subscribe();
        AtomicInteger consumer1Count = new AtomicInteger(0);
        this.admin.topics().getPartitionedStats((String)"persistent://my-property/my-ns/priority-topic", (boolean)true).partitions.forEach((p, stats) -> {
            String activeConsumerName = ((SubscriptionStats)stats.subscriptions.entrySet().iterator().next().getValue()).activeConsumerName;
            if (activeConsumerName.equals("aaa")) {
                consumer1Count.incrementAndGet();
            }
        });
        Assert.assertNotEquals((Object)consumer1Count, (Object)9);
        consumer2.close();
        consumer2 = consumerBuilder.priorityLevel(0).subscribe();
        Consumer consumer3 = consumerBuilder.consumerName("bbb2").priorityLevel(0).subscribe();
        Consumer consumer4 = consumerBuilder.consumerName("bbb3").priorityLevel(0).subscribe();
        Consumer consumer5 = consumerBuilder.consumerName("bbb4").priorityLevel(1).subscribe();
        Integer evenDistributionCount = 3;
        SimpleProducerConsumerTest.retryStrategically(test -> {
            try {
                HashMap subsCount = Maps.newHashMap();
                this.admin.topics().getPartitionedStats((String)"persistent://my-property/my-ns/priority-topic", (boolean)true).partitions.forEach((p, stats) -> {
                    String activeConsumerName = ((SubscriptionStats)stats.subscriptions.entrySet().iterator().next().getValue()).activeConsumerName;
                    subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
                });
                return subsCount.size() == 3 && subsCount.get("bbb1") == evenDistributionCount && subsCount.get("bbb2") == evenDistributionCount && subsCount.get("bbb3") == evenDistributionCount;
            }
            catch (PulsarAdminException pulsarAdminException) {
                return false;
            }
        }, 5, 100L);
        HashMap subsCount = Maps.newHashMap();
        this.admin.topics().getPartitionedStats((String)"persistent://my-property/my-ns/priority-topic", (boolean)true).partitions.forEach((p, stats) -> {
            String activeConsumerName = ((SubscriptionStats)stats.subscriptions.entrySet().iterator().next().getValue()).activeConsumerName;
            subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
        });
        Assert.assertEquals((int)subsCount.size(), (int)3);
        Assert.assertEquals(subsCount.get("bbb1"), (Object)evenDistributionCount);
        Assert.assertEquals(subsCount.get("bbb2"), (Object)evenDistributionCount);
        Assert.assertEquals(subsCount.get("bbb3"), (Object)evenDistributionCount);
        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        consumer5.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionedTopicWithOnePartition() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
        String subscriptionName = "my-sub-";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/one-partitioned-topic", 1);
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://my-property/my-ns/one-partitioned-topic").partitions, (int)1);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/one-partitioned-topic"}).subscriptionName("my-sub-1").consumerName("aaa").subscribe();
        try {
            log.info("Consumer1 created. topic: {}", (Object)consumer1.getTopic());
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/one-partitioned-topic-partition-0"}).subscriptionName("my-sub-2").consumerName("bbb").subscribe();
            try {
                log.info("Consumer2 created. topic: {}", (Object)consumer2.getTopic());
                Producer producer1 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/one-partitioned-topic").enableBatching(false).create();
                try {
                    log.info("Producer1 created. topic: {}", (Object)producer1.getTopic());
                    Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/one-partitioned-topic-partition-0").enableBatching(false).create();
                    try {
                        int i;
                        log.info("Producer2 created. topic: {}", (Object)producer2.getTopic());
                        int numMessages = 10;
                        for (i = 0; i < 10; ++i) {
                            producer1.newMessage().value((Object)("one-partitioned-topic-value-producer1-" + i).getBytes(StandardCharsets.UTF_8)).send();
                            producer2.newMessage().value((Object)("one-partitioned-topic-value-producer2-" + i).getBytes(StandardCharsets.UTF_8)).send();
                        }
                        for (i = 0; i < 20; ++i) {
                            Message msg = consumer1.receive(200, TimeUnit.MILLISECONDS);
                            Assert.assertNotNull((Object)msg);
                            log.info("Consumer1 Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                            msg = consumer2.receive(200, TimeUnit.MILLISECONDS);
                            Assert.assertNotNull((Object)msg);
                            log.info("Consumer2 Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        }
                        Assert.assertNull((Object)consumer1.receive(200, TimeUnit.MILLISECONDS));
                        Assert.assertNull((Object)consumer2.receive(200, TimeUnit.MILLISECONDS));
                        log.info("-- Exiting {} test --", (Object)this.methodName);
                    }
                    finally {
                        if (Collections.singletonList(producer2).get(0) != null) {
                            producer2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer1).get(0) != null) {
                        producer1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer2).get(0) != null) {
                    consumer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer1).get(0) != null) {
                consumer1.close();
            }
        }
    }
}

