/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import avro.shaded.com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class MessageParserTest
extends MockedPulsarServiceBaseTest {
    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT));
        this.admin.tenants().createTenant("my-tenant", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithoutBatches() throws Exception {
        String topic = "persistent://my-tenant/my-ns/my-topic";
        TopicName topicName = TopicName.get((String)topic);
        int n = 10;
        try (Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();){
            for (int i = 0; i < n; ++i) {
                producer.send((Object)("hello-" + i));
            }
        }
        ManagedCursor cursor = ((PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get()).getManagedLedger().newNonDurableCursor((Position)PositionImpl.earliest);
        for (int i = 0; i < n; ++i) {
            Entry entry = (Entry)cursor.readEntriesOrWait(1).get(0);
            ArrayList messages = Lists.newArrayList();
            try {
                MessageParser.parseMessage((TopicName)topicName, (long)entry.getLedgerId(), (long)entry.getEntryId(), (ByteBuf)entry.getDataBuffer(), message -> messages.add(message), (int)0x500000);
            }
            finally {
                entry.release();
            }
            Assert.assertEquals((int)messages.size(), (int)1);
            Assert.assertEquals((Object)((RawMessage)messages.get(0)).getData(), (Object)Unpooled.wrappedBuffer((byte[])("hello-" + i).getBytes()));
            messages.forEach(RawMessage::release);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithBatches() throws Exception {
        String topic = "persistent://my-tenant/my-ns/my-topic-with-batch";
        TopicName topicName = TopicName.get((String)topic);
        int n = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).topic(topic).create();
        ManagedCursor cursor = ((PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get()).getManagedLedger().newNonDurableCursor((Position)PositionImpl.earliest);
        for (int i = 0; i < n - 1; ++i) {
            producer.sendAsync((Object)("hello-" + i));
        }
        producer.send((Object)("hello-" + (n - 1)));
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)1L);
        Entry entry = (Entry)cursor.readEntriesOrWait(1).get(0);
        ArrayList messages = Lists.newArrayList();
        try {
            MessageParser.parseMessage((TopicName)topicName, (long)entry.getLedgerId(), (long)entry.getEntryId(), (ByteBuf)entry.getDataBuffer(), message -> messages.add(message), (int)0x500000);
        }
        finally {
            entry.release();
        }
        Assert.assertEquals((int)messages.size(), (int)10);
        for (int i = 0; i < n; ++i) {
            Assert.assertEquals((Object)((RawMessage)messages.get(i)).getData(), (Object)Unpooled.wrappedBuffer((byte[])("hello-" + i).getBytes()));
        }
        messages.forEach(RawMessage::release);
        producer.close();
    }
}

