/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class PerMessageUnAcknowledgedRedeliveryTest
extends BrokerTestBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(PerMessageUnAcknowledgedRedeliveryTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testSharedAckedNormalTopic() throws Exception {
        String key = "testSharedAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 15;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(50).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            ++received;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        Assert.assertEquals((int)received, (int)5);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)10L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            ++redelivered;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)5);
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
    }

    @Test(timeOut=90000L)
    public void testUnAckedMessageTrackerSize() throws Exception {
        String key = "testUnAckedMessageTrackerSize";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 15;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(50).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl)consumer).getUnAckedMessageTracker();
        long size = unAckedMessageTracker.size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertTrue((boolean)unAckedMessageTracker.add(null));
        Assert.assertTrue((boolean)unAckedMessageTracker.remove(null));
        Assert.assertEquals((int)unAckedMessageTracker.removeMessagesTill(null), (int)0);
    }

    @Test(timeOut=90000L)
    public void testExclusiveAckedNormalTopic() throws Exception {
        String key = "testExclusiveAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 15;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(50).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            ++received;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        Assert.assertEquals((int)received, (int)5);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)10L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            ++redelivered;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)10);
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
    }

    @Test(timeOut=90000L)
    public void testFailoverAckedNormalTopic() throws Exception {
        String key = "testFailoverAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 15;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(50).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionType(SubscriptionType.Failover).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            ++received;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        Assert.assertEquals((int)received, (int)5);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)10L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            ++redelivered;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)10);
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
    }

    private static long getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
        MultiTopicsConsumerImpl pc = (MultiTopicsConsumerImpl)c;
        return pc.getUnAckedMessageTracker().size() + pc.getConsumers().stream().mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum();
    }

    @Test(timeOut=90000L)
    public void testSharedAckedPartitionedTopic() throws Exception {
        String key = "testSharedAckedPartitionedTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 15;
        int numberOfPartitions = 3;
        this.admin.tenants().createTenant("prop", new TenantInfo());
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(50).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        long size = PerMessageUnAcknowledgedRedeliveryTest.getUnackedMessagesCountInPartitionedConsumer((Consumer<byte[]>)consumer);
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            ++received;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = PerMessageUnAcknowledgedRedeliveryTest.getUnackedMessagesCountInPartitionedConsumer((Consumer<byte[]>)consumer);
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        Assert.assertEquals((int)received, (int)5);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 5; ++i) {
            String m = messagePredicate + i;
            log.info("Producer produced: " + m);
            producer.send((Object)m.getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = PerMessageUnAcknowledgedRedeliveryTest.getUnackedMessagesCountInPartitionedConsumer((Consumer<byte[]>)consumer);
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)10L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            ++redelivered;
            String data = new String(message.getData());
            log.info("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)5);
        size = PerMessageUnAcknowledgedRedeliveryTest.getUnackedMessagesCountInPartitionedConsumer((Consumer<byte[]>)consumer);
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
    }
}

