/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ServiceUrlProviderTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ServiceUrlProviderTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testCreateClientWithServiceUrlProvider() throws Exception {
        int i;
        PulsarClient client = PulsarClient.builder().serviceUrlProvider((ServiceUrlProvider)new TestServiceUrlProvider(this.pulsar.getSafeBrokerServiceUrl())).statsInterval(1L, TimeUnit.SECONDS).build();
        Assert.assertTrue((boolean)(((PulsarClientImpl)client).getConfiguration().getServiceUrlProvider() instanceof TestServiceUrlProvider));
        Producer producer = client.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscribe").subscribe();
        for (i = 0; i < 100; ++i) {
            producer.send((Object)("Hello Pulsar[" + i + "]"));
        }
        client.updateServiceUrl(this.pulsar.getSafeBrokerServiceUrl());
        for (i = 100; i < 200; ++i) {
            producer.send((Object)("Hello Pulsar[" + i + "]"));
        }
        int received = 0;
        do {
            Message message = consumer.receive();
            System.out.println((String)message.getValue());
        } while (++received < 200);
        Assert.assertEquals((int)200, (int)received);
        producer.close();
        consumer.close();
        client.close();
    }

    @Test
    public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception {
        AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(this.pulsar.getSafeBrokerServiceUrl());
        PulsarClient client = PulsarClient.builder().serviceUrlProvider((ServiceUrlProvider)serviceUrlProvider).statsInterval(1L, TimeUnit.SECONDS).build();
        Assert.assertTrue((boolean)(((PulsarClientImpl)client).getConfiguration().getServiceUrlProvider() instanceof AutoChangedServiceUrlProvider));
        ProducerImpl producer = (ProducerImpl)client.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        ConsumerImpl consumer = (ConsumerImpl)client.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscribe").subscribe();
        PulsarService pulsarService1 = this.pulsar;
        this.conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
        this.conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort()));
        this.startBroker();
        PulsarService pulsarService2 = this.pulsar;
        log.info("Pulsar1 = {}, Pulsar2 = {}", (Object)pulsarService1.getSafeBrokerServiceUrl(), (Object)pulsarService2.getSafeBrokerServiceUrl());
        Assert.assertNotEquals((Object)pulsarService1.getSafeBrokerServiceUrl(), (Object)pulsarService2.getSafeBrokerServiceUrl());
        log.info("Service url : producer = {}, consumer = {}", (Object)producer.getClient().getLookup().getServiceUrl(), (Object)consumer.getClient().getLookup().getServiceUrl());
        Assert.assertEquals((String)producer.getClient().getLookup().getServiceUrl(), (String)pulsarService1.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)consumer.getClient().getLookup().getServiceUrl(), (String)pulsarService1.getSafeBrokerServiceUrl());
        log.info("Changing service url from {} to {}", (Object)pulsarService1.getSafeBrokerServiceUrl(), (Object)pulsarService2.getSafeBrokerServiceUrl());
        serviceUrlProvider.onServiceUrlChanged(pulsarService2.getSafeBrokerServiceUrl());
        log.info("Service url changed : producer = {}, consumer = {}", (Object)producer.getClient().getLookup().getServiceUrl(), (Object)consumer.getClient().getLookup().getServiceUrl());
        Assert.assertEquals((String)producer.getClient().getLookup().getServiceUrl(), (String)pulsarService2.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)consumer.getClient().getLookup().getServiceUrl(), (String)pulsarService2.getSafeBrokerServiceUrl());
        producer.close();
        consumer.close();
        client.close();
    }

    class AutoChangedServiceUrlProvider
    extends TestServiceUrlProvider {
        public AutoChangedServiceUrlProvider(String serviceUrl) {
            super(serviceUrl);
        }

        public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientException {
            this.getPulsarClient().updateServiceUrl(newServiceUrl);
        }
    }

    class TestServiceUrlProvider
    implements ServiceUrlProvider {
        private PulsarClient pulsarClient;
        private String serviceUrl;

        public TestServiceUrlProvider(String serviceUrl) {
            this.serviceUrl = serviceUrl;
        }

        public String getServiceUrl() {
            return this.serviceUrl;
        }

        public void initialize(PulsarClient client) {
            this.pulsarClient = client;
        }

        public PulsarClient getPulsarClient() {
            return this.pulsarClient;
        }
    }
}

