/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TopicReaderTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider
    public static Object[][] variationsForExpectedPos() {
        return new Object[][]{{true, true, 10}, {true, false, 10}, {false, true, 10}, {false, false, 10}, {true, true, 100}, {true, false, 100}, {false, true, 100}, {false, false, 100}};
    }

    @DataProvider
    public static Object[][] variationsForResetOnLatestMsg() {
        return new Object[][]{{true, 20}, {false, 20}};
    }

    @Test
    public void testSimpleReader() throws Exception {
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderAfterMessagesWerePublished() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testMultipleReaders() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet1 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader1.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage);
        }
        HashSet messageSet2 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader2.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet2, receivedMessage, expectedMessage);
        }
        reader1.close();
        reader2.close();
        producer.close();
    }

    @Test
    public void testTopicStats() throws Exception {
        String topicName = "persistent://my-property/my-ns/testTopicStats";
        Reader reader1 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        TopicStats stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)2);
        reader1.close();
        stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)1);
        reader2.close();
        stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)0);
    }

    @Test(dataProvider="variationsForResetOnLatestMsg")
    public void testReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderOnLatestMessage";
        int halfOfMsgs = numOfMessages / 2;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").create();
        for (int i = 0; i < halfOfMsgs; ++i) {
            producer.send((Object)String.format("my-message-%d", i).getBytes());
        }
        ReaderBuilder readerBuilder = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").startMessageId(MessageId.latest);
        if (startInclusive) {
            readerBuilder.startMessageIdInclusive();
        }
        Reader reader = readerBuilder.create();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            producer.send((Object)String.format("my-message-%d", i).getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("my-message-%d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)halfOfMsgs);
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderOnSpecificMessage() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").create();
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            messageIds.add(producer.send((Object)message.getBytes()));
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").startMessageId((MessageId)messageIds.get(4)).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 5; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderOnSpecificMessageWithBatches() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.send((Object)"my-message-10".getBytes());
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId(MessageId.earliest).create();
        Object lastMessageId = null;
        for (int i = 0; i < 5; ++i) {
            Message msg = reader1.readNext();
            lastMessageId = msg.getMessageId();
        }
        Assert.assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
        System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
        Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId((MessageId)lastMessageId).create();
        for (int i = 5; i < 11; ++i) {
            Message msg = reader2.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            Assert.assertEquals((String)receivedMessage, (String)expectedMessage);
        }
        producer.close();
    }

    @Test(groups={"encryption"})
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        producer.close();
        reader.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testSimpleReaderReachEndOfTopic() throws Exception {
        String expectedMessage;
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        MessageImpl msg = null;
        HashSet messageSet = Sets.newHashSet();
        int index = 0;
        while (reader.hasMessageAvailable()) {
            msg = (MessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            expectedMessage = "my-message-" + index++;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)index, (int)100);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        for (int i = 100; i < 200; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        while (reader.hasMessageAvailable()) {
            msg = (MessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            expectedMessage = "my-message-" + index++;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)index, (int)200);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.send((Object)"my-message-10".getBytes());
        MessageId lastMessageId = null;
        int index = 0;
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        if (reader.hasMessageAvailable()) {
            Message msg = reader.readNext();
            lastMessageId = msg.getMessageId();
            Assert.assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
            while (msg != null) {
                ++index;
                msg = reader.readNext(100, TimeUnit.MILLISECONDS);
            }
            Assert.assertEquals((int)index, (int)101);
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testMessageAvailableAfterRestart() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart";
        String content = "my-message-1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
        try (Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();){
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        var4_4 = null;
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).create();){
            producer.send((Object)content.getBytes());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
        ((Topic)this.pulsar.getBrokerService().getTopicReference(topic).get()).close().get();
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
            String readOut = new String(reader.readNext().getData());
            Assert.assertEquals((String)content, (String)readOut);
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
        int numOfMessage = 10;
        String topicName = "persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
        int numOfMessage = 10;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        HashSet messageSetA = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage);
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        HashSet messageSetB = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
        int numOfMessage = 100;
        int halfMessages = 50;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        MessageId midmessageToSeek = null;
        HashSet messageSetA = Sets.newHashSet();
        for (int i = 0; i < 100; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage);
            if (i != 50) continue;
            midmessageToSeek = message.getMessageId();
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.seek(midmessageToSeek);
        HashSet messageSetB = Sets.newHashSet();
        for (int i = 51; i < 100; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
        int numOfMessage = 10;
        int halfMessages = 5;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").create();
        long l = System.currentTimeMillis();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
            Thread.sleep(100L);
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        int plusTime = 600;
        reader.seek(l + (long)plusTime);
        HashSet messageSet = Sets.newHashSet();
        for (int i = 5; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test(dataProvider="variationsForExpectedPos")
    public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
        int resetIndex = new Random().nextInt(numOfMessages);
        int firstMessage = startInclusive ? resetIndex : resetIndex + 1;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").enableBatching(batching).create();
        MessageId resetPos = null;
        for (int i = 0; i < numOfMessages; ++i) {
            MessageId msgId = producer.send((Object)String.format("msg num %d", i).getBytes());
            if (resetIndex != i) continue;
            resetPos = msgId;
        }
        ReaderBuilder readerBuilder = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").startMessageId(resetPos);
        if (startInclusive) {
            readerBuilder.startMessageIdInclusive();
        }
        Reader reader = readerBuilder.create();
        HashSet messageSet = Sets.newHashSet();
        for (int i = firstMessage; i < numOfMessages; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)(numOfMessages - firstMessage));
        reader.close();
        producer.close();
    }
}

