/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket.proxy;

import java.net.URI;
import java.security.GeneralSecurityException;
import java.security.cert.Certificate;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.TlsProducerConsumerBase;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.proxy.SimpleConsumerSocket;
import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ProxyPublishConsumeTlsTest
extends TlsProducerConsumerBase {
    protected String methodName;
    private int port;
    private int tlsPort;
    private ProxyServer proxyServer;
    private WebSocketService service;
    private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTlsTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.setup();
        super.internalSetUpForNamespace();
        this.port = PortManager.nextFreePort();
        this.tlsPort = PortManager.nextFreePort();
        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
        config.setWebServicePort(Optional.of(this.port));
        config.setWebServicePortTls(Optional.of(this.tlsPort));
        config.setBrokerClientTlsEnabled(true);
        config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        config.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        config.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        config.setClusterName("use");
        config.setConfigurationStoreServers("dummy-zk-servers");
        config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        String lookupUrl = new URI("pulsar://localhost:" + this.BROKER_PORT_TLS).toString();
        this.service = (WebSocketService)Mockito.spy((Object)new WebSocketService(config));
        ((WebSocketService)Mockito.doReturn((Object)this.mockZooKeeperClientFactory).when((Object)this.service)).getZooKeeperClientFactory();
        this.proxyServer = new ProxyServer(config);
        WebSocketServiceStarter.start((ProxyServer)this.proxyServer, (WebSocketService)this.service);
        log.info("Proxy Server Started");
    }

    @Override
    @AfterMethod
    protected void cleanup() throws Exception {
        super.cleanup();
        this.service.close();
        this.proxyServer.stop();
        log.info("Finished Cleaning Up Test setup");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void socketTest() throws InterruptedException, GeneralSecurityException {
        String consumerUri = "wss://localhost:" + this.tlsPort + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
        String producerUri = "wss://localhost:" + this.tlsPort + "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        SslContextFactory sslContextFactory = new SslContextFactory();
        sslContextFactory.setSslContext(SecurityUtility.createSslContext((boolean)false, (Certificate[])SecurityUtility.loadCertificatesFromPemFile((String)"./src/test/resources/authentication/tls/cacert.pem")));
        WebSocketClient consumeClient = new WebSocketClient(sslContextFactory);
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient(sslContextFactory);
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            log.info("Connecting to : {}", (Object)consumeUri);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            SimpleProducerSocket produceSocket = new SimpleProducerSocket();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            consumeSocket.awaitClose(1, TimeUnit.SECONDS);
            produceSocket.awaitClose(1, TimeUnit.SECONDS);
            Assert.assertTrue((produceSocket.getBuffer().size() > 0 ? 1 : 0) != 0);
            Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
        }
        catch (Throwable t) {
            log.error(t.getMessage());
            Assert.fail((String)t.getMessage());
        }
        finally {
            ExecutorService executor = Executors.newFixedThreadPool(1);
            try {
                executor.submit(() -> {
                    try {
                        consumeClient.stop();
                        produceClient.stop();
                        log.info("proxy clients are stopped successfully");
                    }
                    catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }).get(2L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                log.error("failed to close clients ", (Throwable)e);
            }
            executor.shutdownNow();
        }
    }
}

