/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
public class SubscriptionSeekTest
extends BrokerTestBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSeek() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeek";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeek").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeek"}).subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeek").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            MessageId msgId = producer.send((Object)message.getBytes());
            messageIds.add(msgId);
        }
        PersistentSubscription sub = topicRef.getSubscription("my-subscription");
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)10L);
        consumer.seek(MessageId.latest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)0L);
        Thread.sleep(500L);
        consumer.seek(MessageId.earliest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)10L);
        Thread.sleep(500L);
        consumer.seek((MessageId)messageIds.get(5));
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)5L);
    }

    @Test
    public void testSeekOnPartitionedTopic() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekPartitions", 2);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekPartitions"}).subscriptionName("my-subscription").subscribe();
        try {
            consumer.seek(MessageId.latest);
            Assert.fail((String)"Should not have succeeded");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
    }

    @Test
    public void testSeekTime() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeekTime";
        String resetTimeStr = "100s";
        long resetTimeInMillis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds((String)resetTimeStr));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTime").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekTime"}).subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTime").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        PersistentSubscription sub = topicRef.getSubscription("my-subscription");
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)10L);
        long currentTimestamp = System.currentTimeMillis();
        consumer.seek(currentTimestamp);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)1L);
        Thread.sleep(1000L);
        consumer.seek(currentTimestamp - resetTimeInMillis);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(), (long)10L);
    }

    @Test
    public void testSeekTimeOnPartitionedTopic() throws Exception {
        int i;
        String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
        String resetTimeStr = "100s";
        int partitions = 2;
        long resetTimeInMillis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"100s"));
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekTimePartitions", 2);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTimePartitions").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekTimePartitions"}).subscriptionName("my-subscription").subscribe();
        ArrayList<PersistentSubscription> subs = new ArrayList<PersistentSubscription>();
        for (i = 0; i < 2; ++i) {
            PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTimePartitions-partition-" + i).get();
            Assert.assertNotNull((Object)topicRef);
            Assert.assertEquals((long)topicRef.getProducers().size(), (long)1L);
            Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
            PersistentSubscription sub = topicRef.getSubscription("my-subscription");
            Assert.assertNotNull((Object)sub);
            subs.add(sub);
        }
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        long backlogs = 0L;
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog();
        }
        Assert.assertEquals((long)backlogs, (long)10L);
        backlogs = 0L;
        long currentTimestamp = System.currentTimeMillis();
        consumer.seek(currentTimestamp);
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog();
        }
        Assert.assertEquals((long)backlogs, (long)2L);
        Thread.sleep(1000L);
        consumer.seek(currentTimestamp - resetTimeInMillis);
        backlogs = 0L;
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog();
        }
        Assert.assertEquals((long)backlogs, (long)10L);
    }
}

