/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ExposeMessageRedeliveryCountTest
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=30000L)
    public void testRedeliveryCount() throws PulsarClientException {
        Message message;
        int redeliveryCount;
        String topic = "persistent://my-property/my-ns/redeliveryCount";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/redeliveryCount"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(3L, TimeUnit.SECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/redeliveryCount").create();
        producer.send((Object)"Hello Pulsar".getBytes());
        do {
            message = consumer.receive();
            message.getProperties();
        } while ((redeliveryCount = message.getRedeliveryCount()) <= 2);
        consumer.acknowledge(message);
        Assert.assertEquals((int)3, (int)redeliveryCount);
        producer.close();
        consumer.close();
    }

    @Test(timeOut=30000L)
    public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientException, PulsarAdminException {
        Message message;
        int redeliveryCount;
        String topic = "persistent://my-property/my-ns/redeliveryCount.partitioned";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/redeliveryCount.partitioned", 3);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/redeliveryCount.partitioned"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(3L, TimeUnit.SECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/redeliveryCount.partitioned").create();
        producer.send((Object)"Hello Pulsar".getBytes());
        do {
            message = consumer.receive();
            message.getProperties();
        } while ((redeliveryCount = message.getRedeliveryCount()) <= 2);
        consumer.acknowledge(message);
        Assert.assertEquals((int)3, (int)redeliveryCount);
        producer.close();
        consumer.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-property/my-ns/redeliveryCount.partitioned");
    }
}

