/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class UnAcknowledgedMessagesTimeoutTest
extends BrokerTestBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(UnAcknowledgedMessagesTimeoutTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testExclusiveSingleAckedNormalTopic() throws Exception {
        String key = "testExclusiveSingleAckedNormalTopic";
        String topicName = "persistent://prop/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (int i = 0; i < 5; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            log.info("Consumer received : " + new String(message.getData()));
            message = consumer.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)5L);
        message = consumer.receive();
        log.info("Consumer received : " + new String(message.getData()));
        HashSet<String> hSet = new HashSet<String>();
        for (int i = 5; i < 10; ++i) {
            String messageString = messagePredicate + i;
            producer.send((Object)messageString.getBytes());
        }
        do {
            hSet.add(new String(message.getData()));
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertEquals((int)hSet.size(), (int)10);
    }

    @Test(timeOut=90000L)
    public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
        Object message;
        String key = "testExclusiveCumulativeAckedNormalTopic";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (int i = 0; i < 10; ++i) {
            message = messagePredicate + i;
            producer.send((Object)((String)message).getBytes());
        }
        HashSet<String> hSet = new HashSet<String>();
        message = consumer.receive();
        Object lastMessage = message;
        while (message != null) {
            lastMessage = message;
            hSet.add(new String(message.getData()));
            log.info("Consumer received " + new String(message.getData()));
            log.info("Message ID details " + message.getMessageId().toString());
            message = consumer.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        Assert.assertEquals((long)size, (long)10L);
        log.info("Comulative Ack sent for " + new String(lastMessage.getData()));
        log.info("Message ID details " + lastMessage.getMessageId().toString());
        consumer.acknowledgeCumulative(lastMessage);
        size = ((ConsumerImpl)consumer).getUnAckedMessageTracker().size();
        Assert.assertEquals((long)size, (long)0L);
        message = consumer.receive((int)(2L * this.ackTimeOutMillis), TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)message);
    }

    @Test(timeOut=90000L)
    public void testSharedSingleAckedPartitionedTopic() throws Exception {
        String key = "testSharedSingleAckedPartitionedTopic";
        String topicName = "persistent://prop/ns-abc/topic-" + key;
        String subscriptionName = "my-shared-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 20;
        int numberOfPartitions = 3;
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-1").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-2").subscribe();
        for (int i = 0; i < 20; ++i) {
            String message = messagePredicate + i;
            MessageId msgId = producer.send((Object)message.getBytes());
            log.info("Message produced: {} -- msgId: {}", (Object)message, (Object)msgId);
        }
        int messageCount1 = UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer1, false);
        int messageCount2 = UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer2, true);
        int ackCount1 = 0;
        int ackCount2 = messageCount2;
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount1 + messageCount2), (int)20);
        Thread.sleep((int)((double)this.ackTimeOutMillis * 1.1));
        log.info(key + " Timeout should be triggered now");
        messageCount1 = UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer1, true);
        ackCount1 = messageCount1;
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + (messageCount2 += UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer2, false)));
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount1 + messageCount2), (int)20);
        Assert.assertEquals((int)(ackCount1 + messageCount2), (int)20);
        Thread.sleep((int)((double)this.ackTimeOutMillis * 1.1));
        log.info(key + " Timeout should be triggered again");
        log.info(key + " ackCount1 = " + (ackCount1 += UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer1, true)));
        log.info(key + " ackCount2 = " + (ackCount2 += UnAcknowledgedMessagesTimeoutTest.receiveAllMessage(consumer2, true)));
        Assert.assertEquals((int)(ackCount1 + ackCount2), (int)20);
    }

    private static int receiveAllMessage(Consumer<?> consumer, boolean ackMessages) throws Exception {
        int messagesReceived = 0;
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        while (msg != null) {
            ++messagesReceived;
            log.info("Consumer received {}", (Object)new String(msg.getData()));
            if (ackMessages) {
                consumer.acknowledge(msg);
            }
            msg = consumer.receive(1, TimeUnit.SECONDS);
        }
        return messagesReceived;
    }

    @Test(timeOut=90000L)
    public void testFailoverSingleAckedPartitionedTopic() throws Exception {
        String key = "testFailoverSingleAckedPartitionedTopic";
        String topicName = "persistent://prop/ns-abc/topic-" + key;
        String subscriptionName = "my-failover-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        int numberOfPartitions = 3;
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-1").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-2").subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            log.info("Message produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        int messageCount1 = 0;
        int messageCount2 = 0;
        int ackCount1 = 0;
        int ackCount2 = 0;
        do {
            if (message1 != null) {
                log.info("Consumer1 received " + new String(message1.getData()));
                ++messageCount1;
            }
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
                consumer2.acknowledge(message2);
                ++ackCount2;
            }
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(messageCount1 + messageCount2), (int)10);
        log.info(key + " Timeout should be triggered now");
        message1 = consumer1.receive();
        messageCount1 = 0;
        do {
            if (message1 != null) {
                log.info("Consumer1 received " + new String(message1.getData()));
                ++messageCount1;
                consumer1.acknowledge(message1);
                ++ackCount1;
            }
            if (message2 != null) {
                log.info("Consumer2 received " + new String(message2.getData()));
                ++messageCount2;
            }
            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info(key + " messageCount1 = " + messageCount1);
        log.info(key + " messageCount2 = " + messageCount2);
        log.info(key + " ackCount1 = " + ackCount1);
        log.info(key + " ackCount2 = " + ackCount2);
        Assert.assertEquals((int)(ackCount1 + messageCount2), (int)10);
    }

    @Test
    public void testAckTimeoutMinValue() throws PulsarClientException {
        try {
            this.pulsarClient.newConsumer().ackTimeout(999L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Exception should have been thrown since the set timeout is less than min timeout.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test(timeOut=90000L)
    public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, InterruptedException {
        int i;
        String key = "testCheckUnAcknowledgedMessageTimer";
        String topicName = "persistent://prop/ns-abc/topic-" + key;
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 3;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (i = 0; i < 3; ++i) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send((Object)message.getBytes());
        }
        Thread.sleep((long)((double)this.ackTimeOutMillis * 1.1));
        for (i = 0; i < 3; ++i) {
            Message msg = consumer.receive();
            if (i == 2) continue;
            consumer.acknowledge(msg);
        }
        Assert.assertEquals((long)consumer.getUnAckedMessageTracker().size(), (long)1L);
        Message msg = consumer.receive();
        consumer.acknowledge(msg);
        Assert.assertEquals((long)consumer.getUnAckedMessageTracker().size(), (long)0L);
        Thread.sleep((long)((double)this.ackTimeOutMillis * 1.1));
        Assert.assertEquals((long)consumer.getUnAckedMessageTracker().size(), (long)0L);
    }

    @Test
    public void testSingleMessageBatch() throws Exception {
        String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("subscription").ackTimeout(1L, TimeUnit.HOURS).subscribe();
        producer.sendAsync((Object)"hello");
        producer.flush();
        Message message = consumer.receive();
        Assert.assertFalse((boolean)((ConsumerImpl)consumer).getUnAckedMessageTracker().isEmpty());
        consumer.acknowledge(message);
        Assert.assertTrue((boolean)((ConsumerImpl)consumer).getUnAckedMessageTracker().isEmpty());
    }
}

