/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class MaxMessageSizeTest {
    private static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
    PulsarService pulsar;
    ServiceConfiguration configuration;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    private final int BROKER_WEBSERVER_PORT = PortManager.nextFreePort();

    @BeforeMethod
    void setup() {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, PortManager::nextFreePort);
            ServerConfiguration conf = new ServerConfiguration();
            conf.setNettyMaxFrameSizeBytes(0xA00000);
            this.bkEnsemble.startStandalone(conf, false);
            this.configuration = new ServiceConfiguration();
            this.configuration.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
            this.configuration.setAdvertisedAddress("localhost");
            this.configuration.setWebServicePort(Optional.of(this.BROKER_WEBSERVER_PORT));
            this.configuration.setClusterName("max_message_test");
            this.configuration.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT));
            this.configuration.setAuthorizationEnabled(false);
            this.configuration.setAuthenticationEnabled(false);
            this.configuration.setManagedLedgerMaxEntriesPerLedger(5);
            this.configuration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.configuration.setMaxMessageSize(0xA00000);
            this.pulsar = new PulsarService(this.configuration);
            this.pulsar.start();
            String url = "http://127.0.0.1:" + this.BROKER_WEBSERVER_PORT;
            this.admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
            this.admin.clusters().createCluster("max_message_test", new ClusterData(url));
            this.admin.tenants().createTenant("test", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"max_message_test"})));
            this.admin.namespaces().createNamespace("test/message", (Set)Sets.newHashSet((Object[])new String[]{"max_message_test"}));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @AfterMethod
    void shutdown() {
        try {
            this.pulsar.close();
            this.bkEnsemble.stop();
        }
        catch (Throwable t) {
            t.printStackTrace();
        }
    }

    @Test
    public void testMaxMessageSetting() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:" + BROKER_SERVICE_PORT).build();
        String topicName = "persistent://test/message/topic1";
        Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
        Consumer consumer = client.newConsumer().topic(new String[]{topicName}).subscriptionName("test1").subscribe();
        byte[] normalMsg = new byte[0x200000];
        try {
            producer.send((Object)normalMsg);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"Shouldn't have exception at here", (Throwable)e);
        }
        byte[] consumerNormalMsg = consumer.receive().getData();
        Assert.assertEquals((byte[])normalMsg, (byte[])consumerNormalMsg);
        byte[] limitMsg = new byte[0x500000];
        try {
            producer.send((Object)limitMsg);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"Shouldn't have exception at here", (Throwable)e);
        }
        byte[] consumerLimitMsg = consumer.receive().getData();
        Assert.assertEquals((byte[])limitMsg, (byte[])consumerLimitMsg);
        byte[] newNormalMsg = new byte[0x800000];
        try {
            producer.send((Object)newNormalMsg);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"Shouldn't have exception at here", (Throwable)e);
        }
        byte[] consumerNewNormalMsg = consumer.receive().getData();
        Assert.assertEquals((byte[])newNormalMsg, (byte[])consumerNewNormalMsg);
        byte[] newLimitMsg = new byte[0xA00000];
        try {
            producer.send((Object)newLimitMsg);
            Assert.fail((String)"Shouldn't send out this message");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        consumer.unsubscribe();
        consumer.close();
        producer.close();
        client.close();
    }
}

