/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.zookeeper;

import com.google.common.util.concurrent.AtomicDouble;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ZooKeeperClientAspectJTest {
    private ZookeeperServerTest localZkS;
    private ZooKeeper localZkc;
    private final int LOCAL_ZOOKEEPER_PORT = PortManager.nextFreePort();
    private final long ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 2000L;
    private final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZkConnected() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        try {
            ZookeeperBkClientFactoryImpl zkf = new ZookeeperBkClientFactoryImpl((OrderedExecutor)executor);
            CompletableFuture zkFuture = zkf.create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000);
            this.localZkc = (ZooKeeper)zkFuture.get(2000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)this.localZkc.getState().isConnected());
            Assert.assertNotEquals((Object)this.localZkc.getState(), (Object)ZooKeeper.States.CONNECTEDREADONLY);
        }
        finally {
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            executor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitZk() throws Exception {
        try {
            ZookeeperClientFactoryImpl zkfactory = new ZookeeperClientFactoryImpl();
            CompletableFuture zkFuture = zkfactory.create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000);
            this.localZkc = (ZooKeeper)zkFuture.get(2000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)this.localZkc.getState().isConnected());
            Assert.assertNotEquals((Object)this.localZkc.getState(), (Object)ZooKeeper.States.CONNECTEDREADONLY);
            String connection = "127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT + "/prefix";
            ZooKeeper chrootZkc = PulsarClusterMetadataSetup.initZk((String)connection, (int)2000);
            Assert.assertTrue((boolean)chrootZkc.getState().isConnected());
            Assert.assertNotEquals((Object)chrootZkc.getState(), (Object)ZooKeeper.States.CONNECTEDREADONLY);
            chrootZkc.close();
            Assert.assertNotNull((Object)this.localZkc.exists("/prefix", false));
        }
        finally {
            if (this.localZkc != null) {
                this.localZkc.close();
            }
        }
    }

    @BeforeMethod
    void setup() throws Exception {
        this.localZkS = new ZookeeperServerTest(this.LOCAL_ZOOKEEPER_PORT);
        this.localZkS.start();
    }

    @AfterMethod
    void teardown() throws Exception {
        this.localZkS.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false, timeOut=7000L)
    void testZkClientAspectJTrigger() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        ZookeeperBkClientFactoryImpl zkf = new ZookeeperBkClientFactoryImpl((OrderedExecutor)executor);
        CompletableFuture zkFuture = zkf.create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000);
        this.localZkc = (ZooKeeper)zkFuture.get(2000L, TimeUnit.MILLISECONDS);
        try {
            Assert.assertTrue((boolean)this.localZkc.getState().isConnected());
            Assert.assertNotEquals((Object)this.localZkc.getState(), (Object)ZooKeeper.States.CONNECTEDREADONLY);
            final AtomicInteger writeCount = new AtomicInteger(0);
            final AtomicInteger readCount = new AtomicInteger(0);
            ClientCnxnAspect.EventListner listener = new ClientCnxnAspect.EventListner(){

                public void recordLatency(ClientCnxnAspect.EventType eventType, long latencyMiliSecond) {
                    if (eventType.equals((Object)ClientCnxnAspect.EventType.write)) {
                        writeCount.incrementAndGet();
                    } else if (eventType.equals((Object)ClientCnxnAspect.EventType.read)) {
                        readCount.incrementAndGet();
                    }
                }
            };
            ClientCnxnAspect.addListener((ClientCnxnAspect.EventListner)listener);
            CountDownLatch createLatch = new CountDownLatch(1);
            CountDownLatch deleteLatch = new CountDownLatch(1);
            CountDownLatch readLatch = new CountDownLatch(1);
            CountDownLatch existLatch = new CountDownLatch(1);
            this.localZkc.create("/createTest", "data".getBytes(), this.Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> createLatch.countDown(), (Object)"create");
            this.localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> deleteLatch.countDown(), (Object)"delete");
            this.localZkc.exists("/createTest", null, (rc, path, ctx, stat) -> existLatch.countDown(), null);
            this.localZkc.getData("/createTest", null, (rc, path, ctx, data, stat) -> readLatch.countDown(), null);
            createLatch.await();
            deleteLatch.await();
            existLatch.await();
            readLatch.await();
            Thread.sleep(500L);
            Assert.assertEquals((int)writeCount.get(), (int)2);
            Assert.assertEquals((int)readCount.get(), (int)2);
            ClientCnxnAspect.removeListener((ClientCnxnAspect.EventListner)listener);
        }
        finally {
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            executor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false, timeOut=7000L)
    public void testZkOpStatsMetrics() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        ZookeeperBkClientFactoryImpl zkf = new ZookeeperBkClientFactoryImpl((OrderedExecutor)executor);
        CompletableFuture zkFuture = zkf.create("127.0.0.1:" + this.LOCAL_ZOOKEEPER_PORT, ZooKeeperClientFactory.SessionType.ReadWrite, 2000);
        this.localZkc = (ZooKeeper)zkFuture.get(2000L, TimeUnit.MILLISECONDS);
        MockPulsar mockPulsar = new MockPulsar(this.localZkc);
        mockPulsar.setup();
        try {
            PulsarClient pulsarClient = mockPulsar.getClient();
            PulsarService pulsar = mockPulsar.getPulsar();
            pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
            Metrics zkOpMetric = this.getMetric(pulsar, "zk_write_latency");
            Assert.assertNotNull((Object)zkOpMetric);
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_rate_s"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_95percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_99_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_9_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_mean_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_write_time_median_ms"));
            zkOpMetric = this.getMetric(pulsar, "zk_read_latency");
            Assert.assertNotNull((Object)zkOpMetric);
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_rate_s"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_95percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_99_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_9_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_percentile_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_mean_ms"));
            Assert.assertTrue((boolean)zkOpMetric.getMetrics().containsKey("brk_zk_read_time_median_ms"));
            CountDownLatch createLatch = new CountDownLatch(1);
            CountDownLatch deleteLatch = new CountDownLatch(1);
            CountDownLatch readLatch = new CountDownLatch(1);
            CountDownLatch existLatch = new CountDownLatch(1);
            this.localZkc.create("/createTest", "data".getBytes(), this.Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> createLatch.countDown(), (Object)"create");
            this.localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> deleteLatch.countDown(), (Object)"delete");
            this.localZkc.exists("/createTest", null, (rc, path, ctx, stat) -> existLatch.countDown(), null);
            this.localZkc.getData("/createTest", null, (rc, path, ctx, data, stat) -> readLatch.countDown(), null);
            createLatch.await();
            deleteLatch.await();
            existLatch.await();
            readLatch.await();
            Thread.sleep(10L);
            BrokerService brokerService = pulsar.getBrokerService();
            brokerService.updateRates();
            List metrics = brokerService.getTopicMetrics();
            AtomicDouble writeRate = new AtomicDouble();
            AtomicDouble readRate = new AtomicDouble();
            metrics.forEach(m -> {
                if ("zk_write_latency".equalsIgnoreCase(m.getDimension("metric"))) {
                    writeRate.set(((Double)m.getMetrics().get("brk_zk_write_latency_rate_s")).doubleValue());
                } else if ("zk_read_latency".equalsIgnoreCase(m.getDimension("metric"))) {
                    readRate.set(((Double)m.getMetrics().get("brk_zk_read_latency_rate_s")).doubleValue());
                }
            });
            Assert.assertTrue((readRate.get() > 0.0 ? 1 : 0) != 0);
            Assert.assertTrue((writeRate.get() > 0.0 ? 1 : 0) != 0);
        }
        finally {
            mockPulsar.cleanup();
            if (this.localZkc != null) {
                this.localZkc.close();
            }
            executor.shutdown();
        }
    }

    private Metrics getMetric(PulsarService pulsar, String dimension) {
        BrokerService brokerService = pulsar.getBrokerService();
        brokerService.updateRates();
        for (Metrics metric : brokerService.getTopicMetrics()) {
            if (!dimension.equalsIgnoreCase(metric.getDimension("metric"))) continue;
            return metric;
        }
        return null;
    }

    class MockPulsar
    extends BrokerTestBase {
        private final ZooKeeper zk;

        public MockPulsar(ZooKeeper zk) {
            this.zk = zk;
        }

        @Override
        protected void setup() throws Exception {
            super.baseSetup();
            ((PulsarService)Mockito.doReturn((Object)new ZooKeeperClientFactory(){

                public CompletableFuture<ZooKeeper> create(String serverList, ZooKeeperClientFactory.SessionType sessionType, int zkSessionTimeoutMillis) {
                    return CompletableFuture.completedFuture(MockPulsar.this.zk);
                }
            }).when((Object)this.pulsar)).getZooKeeperClientFactory();
        }

        @Override
        protected void cleanup() throws Exception {
            super.internalCleanup();
        }

        @Override
        public PulsarService getPulsar() {
            return this.pulsar;
        }

        public PulsarClient getClient() {
            return this.pulsarClient;
        }
    }

    class ZookeeperServerTest
    implements Closeable {
        private final File zkTmpDir;
        private ZooKeeperServer zks;
        private NIOServerCnxnFactory serverFactory;
        private final int zkPort;
        private final String hostPort;
        private final Logger log = LoggerFactory.getLogger(ZookeeperServerTest.class);

        public ZookeeperServerTest(int zkPort) throws IOException {
            this.zkPort = zkPort;
            this.hostPort = "127.0.0.1:" + zkPort;
            this.zkTmpDir = File.createTempFile("zookeeper", "test");
            this.log.info("**** Start GZK on {} ****", (Object)this.zkTmpDir);
            if (!this.zkTmpDir.delete() || !this.zkTmpDir.mkdir()) {
                throw new IOException("Couldn't create zk directory " + this.zkTmpDir);
            }
        }

        public void start() throws IOException {
            try {
                this.zks = new ZooKeeperServer(this.zkTmpDir, this.zkTmpDir, 3000);
                this.zks.setMaxSessionTimeout(20000);
                this.serverFactory = new NIOServerCnxnFactory();
                this.serverFactory.configure(new InetSocketAddress(this.zkPort), 1000);
                this.serverFactory.startup(this.zks);
            }
            catch (Exception e) {
                this.log.error("Exception while instantiating ZooKeeper", (Throwable)e);
            }
            LocalBookkeeperEnsemble.waitForServerUp((String)this.hostPort, (long)30000L);
            this.log.info("ZooKeeper started at {}", (Object)this.hostPort);
        }

        public void stop() throws IOException {
            this.zks.shutdown();
            this.serverFactory.shutdown();
            this.log.info("Stoppend ZK server at {}", (Object)this.hostPort);
        }

        @Override
        public void close() throws IOException {
            this.zks.shutdown();
            this.serverFactory.shutdown();
            this.zkTmpDir.delete();
        }
    }
}

