/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class PulsarFunctionE2ETest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    URL urlTls;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    WorkerService functionsWorkerService;
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/pulsar-function-admin";
    String primaryHost;
    String workerId;
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    private final int brokerWebServicePort = PortManager.nextFreePort();
    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
    private final int brokerServicePort = PortManager.nextFreePort();
    private final int brokerServiceTlsPort = PortManager.nextFreePort();
    private final int workerServicePort = PortManager.nextFreePort();
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
    private Thread fileServerThread;
    private static final int fileServerPort = PortManager.nextFreePort();
    private HttpServer fileServer;

    @DataProvider(name="validRoleName")
    public Object[][] validRoleName() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @BeforeMethod
    void setup(Method method) throws Exception {
        File[] foundFiles;
        File dir = new File(System.getProperty("java.io.tmpdir"));
        for (File file : foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"))) {
            file.delete();
        }
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, PortManager::nextFreePort);
        this.bkEnsemble.start();
        String brokerServiceUrl = "https://127.0.0.1:" + this.brokerWebServiceTlsPort;
        this.config = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.config.setClusterName("use");
        HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser"});
        this.config.setSuperUserRoles((Set)superUsers);
        this.config.setWebServicePort(Optional.ofNullable(this.brokerWebServicePort));
        this.config.setWebServicePortTls(Optional.ofNullable(this.brokerWebServiceTlsPort));
        this.config.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
        this.config.setBrokerServicePort(Optional.ofNullable(this.brokerServicePort));
        this.config.setBrokerServicePortTls(Optional.ofNullable(this.brokerServiceTlsPort));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthenticationProviders(providers);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        this.config.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientTlsEnabled(true);
        this.config.setAllowAutoTopicCreationType("non-partitioned");
        this.functionsWorkerService = this.createPulsarFunctionWorker(this.config);
        this.urlTls = new URL(brokerServiceUrl);
        Optional<WorkerService> functionWorkerService = Optional.of(this.functionsWorkerService);
        this.pulsar = new PulsarService(this.config, functionWorkerService);
        this.pulsar.start();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication((Authentication)authTls).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = String.format("http://%s:%d", "localhost", this.brokerWebServicePort);
        ClusterData clusterData = new ClusterData(this.urlTls.toString());
        this.admin.clusters().updateCluster(this.config.getClusterName(), clusterData);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
        if (StringUtils.isNotBlank((CharSequence)this.workerConfig.getClientAuthenticationPlugin()) && StringUtils.isNotBlank((CharSequence)this.workerConfig.getClientAuthenticationParameters())) {
            clientBuilder.enableTls(this.workerConfig.isUseTls());
            clientBuilder.allowTlsInsecureConnection(this.workerConfig.isTlsAllowInsecureConnection());
            clientBuilder.authentication(this.workerConfig.getClientAuthenticationPlugin(), this.workerConfig.getClientAuthenticationParameters());
        }
        this.pulsarClient = clientBuilder.build();
        TenantInfo propAdmin = new TenantInfo();
        propAdmin.getAdminRoles().add("superUser");
        propAdmin.setAllowedClusters((Set)Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"})));
        this.admin.tenants().updateTenant("external-repl-prop", propAdmin);
        this.fileServerThread = new Thread(() -> {
            try {
                this.fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
                this.fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
                    try {
                        Headers headers = he.getResponseHeaders();
                        headers.add("Content-Type", "application/octet-stream");
                        File file = new File(this.getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
                        byte[] bytes = new byte[(int)file.length()];
                        FileInputStream fileInputStream = new FileInputStream(file);
                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
                        bufferedInputStream.read(bytes, 0, bytes.length);
                        he.sendResponseHeaders(200, file.length());
                        OutputStream outputStream = he.getResponseBody();
                        outputStream.write(bytes, 0, bytes.length);
                        outputStream.close();
                    }
                    catch (Exception e) {
                        log.error("Error when downloading: {}", (Object)e, (Object)e);
                    }
                });
                this.fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
                    try {
                        Headers headers = he.getResponseHeaders();
                        headers.add("Content-Type", "application/octet-stream");
                        File file = new File(this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
                        byte[] bytes = new byte[(int)file.length()];
                        FileInputStream fileInputStream = new FileInputStream(file);
                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
                        bufferedInputStream.read(bytes, 0, bytes.length);
                        he.sendResponseHeaders(200, file.length());
                        OutputStream outputStream = he.getResponseBody();
                        outputStream.write(bytes, 0, bytes.length);
                        outputStream.close();
                    }
                    catch (Exception e) {
                        log.error("Error when downloading: {}", (Object)e, (Object)e);
                    }
                });
                this.fileServer.setExecutor(null);
                log.info("Starting file server...");
                this.fileServer.start();
            }
            catch (Exception e) {
                log.error("Failed to start file server: ", (Throwable)e);
                this.fileServer.stop(0);
            }
        });
        this.fileServerThread.start();
    }

    @AfterMethod
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.fileServer.stop(0);
        this.fileServerThread.interrupt();
        this.pulsarClient.close();
        this.admin.close();
        this.functionsWorkerService.stop();
        this.pulsar.close();
        this.bkEnsemble.stop();
    }

    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
        System.setProperty("pulsar.functions.java.instance.jar", FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        this.workerConfig = new WorkerConfig();
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
        this.workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(Integer.valueOf(this.workerServicePort));
        this.workerConfig.setPulsarFunctionsCluster(config.getClusterName());
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(hostname);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsAllowInsecureConnection(true);
        this.workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.workerConfig.setAuthenticationEnabled(true);
        this.workerConfig.setAuthorizationEnabled(true);
        return new WorkerService(this.workerConfig);
    }

    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(subscriptionName);
        functionConfig.setTopicsPattern(sourceTopicPattern);
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput(sinkTopic);
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        return functionConfig;
    }

    private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(tenant);
        sourceConfig.setNamespace(namespace);
        sourceConfig.setName(functionName);
        sourceConfig.setParallelism(Integer.valueOf(1));
        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sourceConfig.setTopicName(sinkTopic);
        return sourceConfig;
    }

    private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(functionName);
        sinkConfig.setParallelism(Integer.valueOf(1));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(subName);
        sinkConfig.setCleanupSubscription(Boolean.valueOf(true));
        return sinkConfig;
    }

    private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String sinkTopic2 = "persistent://external-repl-prop/io/output2";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output2"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        functionConfig.setParallelism(Integer.valueOf(2));
        functionConfig.setOutput("persistent://external-repl-prop/io/output2");
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
                return topicStats.publishers.size() == 2 && ((PublisherStats)topicStats.publishers.get((int)0)).metadata != null && ((PublisherStats)topicStats.publishers.get((int)0)).metadata.containsKey("id") && ((String)((PublisherStats)topicStats.publishers.get((int)0)).metadata.get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
        Assert.assertEquals((int)topicStats.publishers.size(), (int)2);
        Assert.assertNotNull((Object)((PublisherStats)topicStats.publishers.get((int)0)).metadata);
        Assert.assertTrue((boolean)((PublisherStats)topicStats.publishers.get((int)0)).metadata.containsKey("id"));
        Assert.assertEquals((String)((String)((PublisherStats)topicStats.publishers.get((int)0)).metadata.get("id")), (String)String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
        int totalMsgs = 5;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub");
                return subStats.unackedMessages == 0L;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Message msg = consumer.receive(5, TimeUnit.SECONDS);
        String receivedPropertyValue = msg.getProperty("key");
        Assert.assertEquals((String)"value", (String)receivedPropertyValue);
        Assert.assertNotEquals((Object)((SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.values().iterator().next()).unackedMessages, (Object)totalMsgs);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)0);
        File dir = new File(System.getProperty("java.io.tmpdir"));
        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
        Assert.assertEquals((int)foundFiles.length, (int)0, (String)("Temporary files left over: " + Arrays.asList(foundFiles)));
    }

    @Test(timeOut=20000L)
    public void testE2EPulsarFunctionWithFile() throws Exception {
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        this.testE2EPulsarFunction(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testE2EPulsarFunctionWithUrl() throws Exception {
        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
        this.testE2EPulsarFunction(jarFilePathUrl);
    }

    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig sinkConfig = PulsarFunctionE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        this.admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(523)).build()));
        this.admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                return topicStats.subscriptions.containsKey("test-sub") && ((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.get((int)0)).availablePermits == 523;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
        Assert.assertEquals((int)topicStats.subscriptions.size(), (int)1);
        Assert.assertTrue((boolean)topicStats.subscriptions.containsKey("test-sub"));
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.size(), (int)1);
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.get((int)0)).availablePermits, (int)523);
        String prometheusMetrics = PulsarFunctionE2ETest.getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        Map<String, Metric> metrics = PulsarFunctionE2ETest.parseMetrics(prometheusMetrics);
        Metric m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.get("test-sub");
                return subStats.unackedMessages == 0L && subStats.msgThroughputOut == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        prometheusMetrics = PulsarFunctionE2ETest.getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheusMetrics: {}", (Object)prometheusMetrics);
        metrics = PulsarFunctionE2ETest.parseMetrics(prometheusMetrics);
        m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        this.admin.sink().deleteSink("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.size(), (int)0);
        File dir = new File(System.getProperty("java.io.tmpdir"));
        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
        Assert.assertEquals((int)foundFiles.length, (int)0, (String)("Temporary files left over: " + Arrays.asList(foundFiles)));
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkStatsWithFile() throws Exception {
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String sourceName = "PulsarSource-test";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        SourceConfig sourceConfig = PulsarFunctionE2ETest.createSourceConfig("external-repl-prop", "io", "PulsarSource-test", "persistent://external-repl-prop/io/output");
        this.admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output").publishers.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 150L);
        String sinkTopic2 = "persistent://external-repl-prop/io/output2";
        sourceConfig.setTopicName("persistent://external-repl-prop/io/output2");
        this.admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats sourceStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
                return sourceStats.publishers.size() == 1 && ((PublisherStats)sourceStats.publishers.get((int)0)).metadata != null && ((PublisherStats)sourceStats.publishers.get((int)0)).metadata.containsKey("id") && ((String)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"));
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats sourceStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
        Assert.assertEquals((int)sourceStats.publishers.size(), (int)1);
        Assert.assertNotNull((Object)((PublisherStats)sourceStats.publishers.get((int)0)).metadata);
        Assert.assertTrue((boolean)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.containsKey("id"));
        Assert.assertEquals((String)((String)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.get("id")), (String)String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"));
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output2").publishers.size() == 1 && this.admin.topics().getInternalStats((String)"persistent://external-repl-prop/io/output2").numberOfEntries > 4L;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output2").publishers.size(), (int)1);
        String prometheusMetrics = PulsarFunctionE2ETest.getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheusMetrics: {}", (Object)prometheusMetrics);
        Map<String, Metric> metrics = PulsarFunctionE2ETest.parseMetrics(prometheusMetrics);
        Metric m = metrics.get("pulsar_source_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_source_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_source_source_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_source_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_source_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_source_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        File dir = new File(System.getProperty("java.io.tmpdir"));
        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
        Assert.assertEquals((int)foundFiles.length, (int)0, (String)("Temporary files left over: " + Arrays.asList(foundFiles)));
    }

    @Test(timeOut=20000L)
    public void testPulsarSourceStatsWithFile() throws Exception {
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
        this.testPulsarSourceStats(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSourceStatsWithUrl() throws Exception {
        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
        this.testPulsarSourceStats(jarFilePathUrl);
    }

    @Test(timeOut=20000L)
    public void testPulsarFunctionStats() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
        FunctionRuntimeManager functionRuntimeManager = this.functionsWorkerService.getFunctionRuntimeManager();
        FunctionStats functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", null);
        FunctionStats functionStatsFromAdmin = this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals((Object)functionStats, (Object)functionStatsFromAdmin);
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)functionStats.avgProcessLatency, null);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)functionStats.oneMin.getAvgProcessLatency(), null);
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertEquals((Object)functionStats.getLastInvocation(), null);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().avgProcessLatency, null);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getAvgProcessLatency(), null);
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getAvgProcessLatency());
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)functionStats.getAvgProcessLatency());
        String prometheusMetrics = PulsarFunctionE2ETest.getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        Map<String, Metric> metrics = PulsarFunctionE2ETest.parseMetrics(prometheusMetrics);
        Metric m = metrics.get("pulsar_function_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)Double.NaN);
        m = metrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)Double.NaN);
        m = metrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, null);
        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsAdmin = this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0);
        Assert.assertEquals((Object)functionInstanceStats, (Object)functionInstanceStatsAdmin);
        Assert.assertEquals((Object)functionInstanceStats, (Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics());
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub");
                return subStats.unackedMessages == 0L && subStats.msgThroughputOut == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", null);
        functionStatsFromAdmin = this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals((Object)functionStats, (Object)functionStatsFromAdmin);
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.avgProcessLatency > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.oneMin.getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertTrue((functionStats.getLastInvocation() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().avgProcessLatency > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get((int)0)).getMetrics().oneMin.getAvgProcessLatency());
        Assert.assertEquals((Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)functionStats.getAvgProcessLatency());
        functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, null);
        functionInstanceStatsAdmin = this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0);
        Assert.assertEquals((Object)functionInstanceStats, (Object)functionInstanceStatsAdmin);
        Assert.assertEquals((Object)functionInstanceStats, (Object)((FunctionStats.FunctionInstanceStats)functionStats.instances.get(0)).getMetrics());
        prometheusMetrics = PulsarFunctionE2ETest.getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        metrics = PulsarFunctionE2ETest.parseMetrics(prometheusMetrics);
        m = metrics.get("pulsar_function_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)0.0);
        m = metrics.get("pulsar_function_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        m = metrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((Object)m.value, (Object)totalMsgs);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)0);
        File dir = new File(System.getProperty("java.io.tmpdir"));
        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
        Assert.assertEquals((int)foundFiles.length, (int)0, (String)("Temporary files left over: " + Arrays.asList(foundFiles)));
    }

    @Test(timeOut=20000L)
    public void testPulsarFunctionStatus() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub");
                return subStats.unackedMessages == 0L && subStats.msgThroughputOut == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarSink-test");
        int numInstances = functionStatus.getNumInstances();
        Assert.assertEquals((int)numInstances, (int)1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus();
        double count = status.getNumReceived();
        double success = status.getNumSuccessfullyProcessed();
        String ownerWorkerId = status.getWorkerId();
        Assert.assertEquals((int)((int)count), (int)totalMsgs);
        Assert.assertEquals((int)((int)success), (int)totalMsgs);
        Assert.assertEquals((String)ownerWorkerId, (String)this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)0);
    }

    @Test(dataProvider="validRoleName")
    public void testAuthorization(boolean validRoleName) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        String roleName = validRoleName ? "superUser" : "invalid";
        TenantInfo propAdmin = new TenantInfo();
        propAdmin.getAdminRoles().add(roleName);
        propAdmin.setAllowedClusters((Set)Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"})));
        this.admin.tenants().updateTenant("external-repl-prop", propAdmin);
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        try {
            this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
            Assert.assertTrue((boolean)validRoleName);
        }
        catch (PulsarAdminException.NotAuthorizedException ne) {
            Assert.assertFalse((boolean)validRoleName);
        }
    }

    @Test(timeOut=20000L)
    public void testFunctionStopAndRestartApi() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopicName = "restartFunction";
        String sourceTopic = "persistent://external-repl-prop/io/restartFunction";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/restartFunction").create();
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "restartFunction", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                return subStats != null && subStats.consumers.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        this.admin.functions().stopFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStat = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                return subStat != null && subStat.consumers.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
        Assert.assertEquals((int)subStats.consumers.size(), (int)0);
        this.admin.functions().restartFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStat = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                return subStat != null && subStat.consumers.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        producer.close();
    }

    @Test(timeOut=20000L)
    public void testFunctionAutomaticSubCleanup() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = "file:" + this.getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setInputs(Collections.singleton("persistent://external-repl-prop/io/my-topic1"));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(Boolean.valueOf(false));
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription();
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription();
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertTrue((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.get(InstanceUtils.getDefaultSubscriptionName((String)"external-repl-prop", (String)"io", (String)"PulsarFunction-test"));
                return subStats.unackedMessages == 0L && subStats.msgThroughputOut == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
        int numInstances = functionStatus.getNumInstances();
        Assert.assertEquals((int)numInstances, (int)1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus();
        double count = status.getNumReceived();
        double success = status.getNumSuccessfullyProcessed();
        String ownerWorkerId = status.getWorkerId();
        Assert.assertEquals((int)((int)count), (int)totalMsgs);
        Assert.assertEquals((int)((int)success), (int)totalMsgs);
        Assert.assertEquals((String)ownerWorkerId, (String)this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)0);
        functionConfig.setCleanupSubscription(Boolean.valueOf(false));
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                FunctionConfig result = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        functionConfig.setParallelism(Integer.valueOf(2));
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                FunctionConfig result = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/my-topic1").subscriptions.size(), (int)1);
    }

    public static String getPrometheusMetrics(int metricsPort) throws IOException {
        String line;
        StringBuilder result = new StringBuilder();
        URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestMethod("GET");
        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
        while ((line = rd.readLine()) != null) {
            result.append(line + System.lineSeparator());
        }
        rd.close();
        return result.toString();
    }

    private static Map<String, Metric> parseMetrics(String metrics) {
        HashMap<String, Metric> parsed = new HashMap<String, Metric>();
        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
        Arrays.asList(metrics.split("\n")).forEach(line -> {
            if (line.isEmpty() || line.startsWith("#")) {
                return;
            }
            Matcher matcher = pattern.matcher((CharSequence)line);
            Preconditions.checkArgument((boolean)matcher.matches());
            String name = matcher.group(1);
            Metric m = new Metric();
            m.value = Double.valueOf(matcher.group(3));
            String tags = matcher.group(2);
            Matcher tagsMatcher = tagsPattern.matcher(tags);
            while (tagsMatcher.find()) {
                String tag = tagsMatcher.group(1);
                String value = tagsMatcher.group(2);
                m.tags.put(tag, value);
            }
            parsed.put(name, m);
        });
        return parsed;
    }

    static class Metric {
        final Map<String, String> tags = new TreeMap<String, String>();
        double value;

        Metric() {
        }

        public String toString() {
            return "PulsarFunctionE2ETest.Metric(tags=" + this.tags + ", value=" + this.value + ")";
        }
    }
}

