/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class MessageRedeliveryTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageRedeliveryTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="useOpenRangeSet")
    public static Object[][] useOpenRangeSet() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="useOpenRangeSet", timeOut=30000L)
    public void testRedelivery(boolean useOpenRangeSet) throws Exception {
        this.conf.setManagedLedgerMaxEntriesPerLedger(5);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.conf.setManagedLedgerUnackedRangesOpenCacheSetEnabled(useOpenRangeSet);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, (ThreadFactory)new DefaultThreadFactory("pulsar"));
        try {
            String ns1 = "my-property/brok-ns1";
            String subName = "my-subscriber-name";
            int numMessages = 50;
            this.admin.namespaces().createNamespace("my-property/brok-ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            String topic1 = "persistent://my-property/brok-ns1/my-topic";
            ConsumerImpl consumer1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/brok-ns1/my-topic"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
            ConsumerImpl consumer2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/brok-ns1/my-topic"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
            ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/brok-ns1/my-topic").create();
            for (int i = 0; i < 50; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            CountDownLatch latch = new CountDownLatch(50);
            AtomicBoolean consume1 = new AtomicBoolean(true);
            AtomicBoolean consume2 = new AtomicBoolean(true);
            Set ackedMessages = Sets.newConcurrentHashSet();
            AtomicInteger counter = new AtomicInteger(0);
            executor.submit(() -> {
                while (true) {
                    block5: {
                        try {
                            Message msg = consumer1.receive(1000, TimeUnit.MILLISECONDS);
                            if (msg == null) break;
                            if (counter.getAndIncrement() % 2 != 0) break block5;
                            try {
                                consumer1.acknowledge(msg);
                                ackedMessages.add(new String(msg.getData()));
                            }
                            catch (PulsarClientException e1) {
                                log.warn("Failed to ack message {}", (Object)e1.getMessage());
                            }
                        }
                        catch (PulsarClientException e2) {
                            break;
                        }
                    }
                    latch.countDown();
                }
            });
            executor.submit(() -> {
                while (consume2.get()) {
                    try {
                        Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS);
                        if (msg == null) break;
                        consumer2.acknowledge(msg);
                        ackedMessages.add(new String(msg.getData()));
                    }
                    catch (PulsarClientException e2) {
                        break;
                    }
                    latch.countDown();
                }
            });
            latch.await(10000L, TimeUnit.MILLISECONDS);
            consume1.set(false);
            Thread.sleep(1000L);
            Assert.assertNotEquals((Object)ackedMessages.size(), (Object)50);
            PersistentTopic pTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/brok-ns1/my-topic").get()).get();
            ManagedLedgerImpl ml = (ManagedLedgerImpl)pTopic.getManagedLedger();
            ManagedCursorImpl cursor = (ManagedCursorImpl)ml.getCursors().iterator().next();
            consumer1.close();
            CountDownLatch latch2 = new CountDownLatch(1);
            executor.submit(() -> {
                while (true) {
                    try {
                        Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS);
                        if (msg == null) break;
                        consumer2.acknowledge(msg);
                        ackedMessages.add(new String(msg.getData()));
                    }
                    catch (PulsarClientException e2) {
                        break;
                    }
                    if (ackedMessages.size() != 50) continue;
                    latch2.countDown();
                }
            });
            latch2.await(20000L, TimeUnit.MILLISECONDS);
            consumer2.close();
            Assert.assertEquals((int)ackedMessages.size(), (int)50);
            Assert.assertEquals((int)cursor.getIndividuallyDeletedMessagesSet().size(), (int)0);
            Assert.assertEquals((Object)cursor.getReadPosition(), (Object)cursor.getMarkDeletedPosition().getNext());
            producer.close();
            consumer2.close();
        }
        finally {
            executor.shutdown();
        }
    }

    @Test
    public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException, PulsarAdminException {
        String topic = "testDoNotRedeliveryMarkDeleteMessages";
        String subName = "my-sub";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"testDoNotRedeliveryMarkDeleteMessages"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("testDoNotRedeliveryMarkDeleteMessages").enableBatching(false).create();
        producer.send((Object)"Pulsar".getBytes());
        for (int i = 0; i < 2; ++i) {
            Message message = consumer.receive();
            Assert.assertNotNull((Object)message);
        }
        this.admin.topics().skipAllMessages("testDoNotRedeliveryMarkDeleteMessages", "my-sub");
        Message message = null;
        try {
            message = consumer.receive(2, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertNull(message);
    }
}

