/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class DeadLetterTopicTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testDeadLetterTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(3L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        producer.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
        } while (++totalReceived < 300);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < 100);
        deadLetterConsumer.close();
        consumer.close();
        Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        checkConsumer.close();
        newPulsarClient.close();
    }

    @Test(enabled=false)
    public void testDeadLetterTopicWithMultiTopic() throws Exception {
        String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1";
        String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(3L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer deadLetterConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer1 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1").create();
        Producer producer2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-2").create();
        for (int i = 0; i < sendMessages; ++i) {
            producer1.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            producer2.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        sendMessages *= 2;
        producer1.close();
        producer2.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {} - total = {}", new Object[]{message.getMessageId(), new String(message.getData()), ++totalReceived});
        } while (totalReceived < sendMessages * 3);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < sendMessages);
        deadLetterConsumer.close();
        consumer.close();
        Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        checkConsumer.close();
    }

    @Test
    public void testDeadLetterTopicByCustomTopicName() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(3L, TimeUnit.SECONDS).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ"}).subscriptionName("my-subscription").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        producer.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
        } while (++totalReceived < 300);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < 100);
        deadLetterConsumer.close();
        consumer.close();
        PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        newPulsarClient.close();
        newPulsarClient1.close();
        checkConsumer.close();
    }

    @Test(timeOut=200000L)
    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
        String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately").create();
        producer.send((Object)"a message".getBytes());
        Thread.sleep(5000L);
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull((Object)msg);
    }
}

