/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ReaderTest
extends MockedPulsarServiceBaseTest {
    private static final String subscription = "reader-sub";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT));
        this.admin.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
        HashSet<String> keys = new HashSet<String>();
        ProducerBuilder builder = this.pulsarClient.newProducer();
        builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
        builder.maxPendingMessages(count);
        builder.topic(topic);
        if (enableBatch) {
            builder.enableBatching(true);
            builder.batchingMaxMessages(count);
        } else {
            builder.enableBatching(false);
        }
        try (Producer producer = builder.create();){
            Future lastFuture = null;
            for (int i = 0; i < count; ++i) {
                String key = "key" + i;
                byte[] data = ("my-message-" + i).getBytes();
                lastFuture = producer.newMessage().key(key).value((Object)data).sendAsync();
                keys.add(key);
            }
            lastFuture.get();
        }
        return keys;
    }

    @Test
    public void testReadMessageWithoutBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic";
        this.testReadMessages(topic, false);
    }

    @Test
    public void testReadMessageWithBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
        this.testReadMessages(topic, true);
    }

    private void testReadMessages(String topic, boolean enableBatch) throws Exception {
        int numKeys = 10;
        Set<String> keys = this.publishMessages(topic, numKeys, enableBatch);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            Assert.assertTrue((boolean)keys.remove(message.getKey()));
        }
        Assert.assertTrue((boolean)keys.isEmpty());
    }

    @Test
    public void testReadFromPartition() throws Exception {
        String topic = "persistent://my-property/my-ns/testReadFromPartition";
        String partition0 = topic + "-partition-0";
        this.admin.topics().createPartitionedTopic(topic, 4);
        int numKeys = 10;
        Set<String> keys = this.publishMessages(partition0, numKeys, false);
        Reader reader = this.pulsarClient.newReader().topic(partition0).startMessageId(MessageId.earliest).create();
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            Assert.assertTrue((boolean)keys.remove(message.getKey()));
        }
        Assert.assertTrue((boolean)keys.isEmpty());
    }
}

