/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.protonj2.test.driver.netty.netty4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.lang.invoke.MethodHandles;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
import org.apache.qpid.protonj2.test.driver.netty.NettyServer;
import org.apache.qpid.protonj2.test.driver.netty.netty4.Netty4EventLoop;
import org.apache.qpid.protonj2.test.driver.netty.netty4.SslSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Netty4Server
implements NettyServer {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    static final int PORT = Integer.parseInt(System.getProperty("port", "5672"));
    static final String WEBSOCKET_PATH = "/";
    static final int DEFAULT_MAX_FRAME_SIZE = 65535;
    private Netty4EventLoop eventLoop;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;
    private Channel clientChannel;
    private final ProtonTestServerOptions options;
    private int maxFrameSize = 65535;
    private String webSocketPath = "/";
    private boolean wsCompressionRequest;
    private boolean wsCompressionResponse;
    private volatile SslHandler sslHandler;
    private volatile WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete;
    private final CountDownLatch handshakeCompletion = new CountDownLatch(1);
    private final AtomicBoolean started = new AtomicBoolean();
    private final Consumer<ByteBuffer> inputConsumer;
    private final Runnable connectedRunnable;
    private final Runnable disconnectedRunnable;

    public Netty4Server(ProtonTestServerOptions options, Runnable connectedRunnable, Runnable disconnectedRunnable, Consumer<ByteBuffer> inputConsumer) {
        Objects.requireNonNull(options);
        Objects.requireNonNull(inputConsumer);
        Objects.requireNonNull(connectedRunnable);
        Objects.requireNonNull(disconnectedRunnable);
        this.options = options;
        this.connectedRunnable = connectedRunnable;
        this.inputConsumer = inputConsumer;
        this.disconnectedRunnable = disconnectedRunnable;
    }

    @Override
    public boolean isSecureServer() {
        return this.options.isSecure();
    }

    @Override
    public boolean isAcceptingConnections() {
        return this.serverChannel != null && this.serverChannel.isOpen();
    }

    @Override
    public boolean hasSecureConnection() {
        return this.sslHandler != null;
    }

    @Override
    public boolean hasClientConnection() {
        return this.clientChannel != null && this.clientChannel.isOpen();
    }

    @Override
    public int getClientPort() {
        Objects.requireNonNull(this.clientChannel);
        return ((InetSocketAddress)this.clientChannel.remoteAddress()).getPort();
    }

    @Override
    public boolean isWSCompressionActive() {
        Objects.requireNonNull(this.clientChannel);
        return this.wsCompressionRequest && this.wsCompressionResponse;
    }

    @Override
    public boolean isPeerVerified() {
        try {
            if (this.hasSecureConnection()) {
                return this.sslHandler.engine().getSession().getPeerPrincipal() != null;
            }
            return false;
        }
        catch (SSLPeerUnverifiedException unverified) {
            return false;
        }
    }

    @Override
    public SSLEngine getConnectionSSLEngine() {
        if (this.hasSecureConnection()) {
            return this.sslHandler.engine();
        }
        return null;
    }

    @Override
    public boolean isWebSocketServer() {
        return this.options.isUseWebSockets();
    }

    @Override
    public String getWebSocketPath() {
        return this.webSocketPath;
    }

    @Override
    public void setWebSocketPath(String webSocketPath) {
        this.webSocketPath = webSocketPath;
    }

    @Override
    public int getMaxFrameSize() {
        return this.maxFrameSize;
    }

    @Override
    public void setMaxFrameSize(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    public boolean awaitHandshakeCompletion(long delayMs) throws InterruptedException {
        return this.handshakeCompletion.await(delayMs, TimeUnit.MILLISECONDS);
    }

    public WebSocketServerProtocolHandler.HandshakeComplete getHandshakeComplete() {
        return this.handshakeComplete;
    }

    @Override
    public URI getConnectionURI(String queryString) throws Exception {
        if (!this.started.get()) {
            throw new IllegalStateException("Cannot get URI of non-started server");
        }
        int port = this.getServerPort();
        String scheme = this.isWebSocketServer() ? (this.isSecureServer() ? "amqpwss" : "amqpws") : (this.isSecureServer() ? "amqps" : "amqp");
        String path = this.isWebSocketServer() ? this.getWebSocketPath() : null;
        if (queryString != null && queryString.startsWith("?")) {
            queryString = queryString.substring(1);
        }
        return new URI(scheme, null, "localhost", port, path, queryString, null);
    }

    @Override
    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            this.bossGroup = new NioEventLoopGroup(1);
            this.workerGroup = new NioEventLoopGroup();
            ServerBootstrap server = new ServerBootstrap();
            server.group(this.bossGroup, this.workerGroup);
            server.channel(NioServerSocketChannel.class);
            server.option(ChannelOption.SO_BACKLOG, (Object)100);
            server.handler((ChannelHandler)new LoggingHandler(LogLevel.INFO));
            server.childHandler((ChannelHandler)new ChannelInitializer<Channel>(){

                public void initChannel(Channel ch) throws Exception {
                    if (Netty4Server.this.clientChannel != null) {
                        throw new UnsupportedOperationException("Server cannot have more than one connected client at a time");
                    }
                    Netty4Server.this.clientChannel = ch;
                    Netty4Server.this.eventLoop = new Netty4EventLoop(ch.eventLoop());
                    if (Netty4Server.this.isSecureServer()) {
                        ChannelHandler[] channelHandlerArray = new ChannelHandler[1];
                        Netty4Server.this.sslHandler = SslSupport.createServerSslHandler(null, Netty4Server.this.options);
                        channelHandlerArray[0] = Netty4Server.this.sslHandler;
                        ch.pipeline().addLast(channelHandlerArray);
                    }
                    if (Netty4Server.this.options.isTraceBytes()) {
                        ch.pipeline().addLast(new ChannelHandler[]{new LoggingHandler(((Object)((Object)this)).getClass())});
                    }
                    if (Netty4Server.this.options.isUseWebSockets()) {
                        ch.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()});
                        ch.pipeline().addLast(new ChannelHandler[]{new HttpObjectAggregator(65536)});
                        if (Netty4Server.this.options.isWebSocketCompression()) {
                            ch.pipeline().addLast(new ChannelHandler[]{new ServerWSCompressionObserver()});
                            ch.pipeline().addLast(new ChannelHandler[]{new WebSocketServerCompressionHandler()});
                        }
                        ch.pipeline().addLast(new ChannelHandler[]{new WebSocketServerProtocolHandler(Netty4Server.this.getWebSocketPath(), "amqp", true, Netty4Server.this.maxFrameSize)});
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new NettyServerOutboundHandler()});
                    ch.pipeline().addLast(new ChannelHandler[]{new NettyServerInboundHandler()});
                    ch.pipeline().addLast(new ChannelHandler[]{Netty4Server.this.getServerHandler()});
                }
            });
            this.serverChannel = server.bind(this.options.getServerPort()).sync().channel();
            this.options.setServerPort(((InetSocketAddress)this.serverChannel.localAddress()).getPort());
        }
    }

    protected ChannelHandler getServerHandler() {
        return new SimpleChannelInboundHandler<ByteBuf>(){

            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                Netty4Server.this.connectedRunnable.run();
                ctx.fireChannelActive();
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                Netty4Server.this.disconnectedRunnable.run();
                ctx.fireChannelInactive();
            }

            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
                LOG.trace("AMQP Test Server Channel read: {}", (Object)input);
                try {
                    ByteBuffer copy = ByteBuffer.allocate(input.readableBytes());
                    input.readBytes(copy);
                    Netty4Server.this.inputConsumer.accept(copy.flip().asReadOnlyBuffer());
                }
                catch (Throwable e) {
                    LOG.error("Closed AMQP Test server channel due to error: ", e);
                    ctx.channel().close();
                }
            }
        };
    }

    @Override
    public void write(ByteBuffer frame) {
        if (this.clientChannel == null || !this.clientChannel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        this.clientChannel.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer)frame), this.clientChannel.voidPromise());
    }

    @Override
    public NettyEventLoop eventLoop() {
        if (this.clientChannel == null || !this.clientChannel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        return this.eventLoop;
    }

    @Override
    public void stopAsync() throws InterruptedException {
        if (this.started.compareAndSet(true, false)) {
            LOG.info("Closing channel asynchronously");
            this.serverChannel.close().sync();
            if (this.clientChannel != null) {
                this.clientChannel.close();
            }
            int timeout = 100;
            LOG.trace("Shutting down boss group asynchronously");
            this.bossGroup.shutdownGracefully(0L, (long)timeout, TimeUnit.MILLISECONDS);
            LOG.trace("Shutting down worker group asynchronously");
            this.workerGroup.shutdownGracefully(0L, (long)timeout, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void disconnectClient() throws Exception {
        if (!this.started.get() || !this.serverChannel.isOpen()) {
            throw new IllegalStateException("Server must be currently active in order to reset");
        }
        if (this.clientChannel != null) {
            try {
                if (!this.clientChannel.close().await(10L, TimeUnit.SECONDS)) {
                    LOG.info("Connected Client channel close timed out waiting for result");
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOG.debug("Close of connected client channel interrupted while awaiting result");
            }
            finally {
                this.clientChannel = null;
            }
        }
    }

    @Override
    public void stop() throws InterruptedException {
        if (this.started.compareAndSet(true, false)) {
            LOG.info("Syncing channel close");
            this.serverChannel.close().syncUninterruptibly();
            if (this.clientChannel != null) {
                try {
                    if (!this.clientChannel.close().await(10L, TimeUnit.SECONDS)) {
                        LOG.info("Connected Client channel close timed out waiting for result");
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    LOG.debug("Close of connected client channel interrupted while awaiting result");
                }
            }
            int timeout = 100;
            LOG.trace("Shutting down boss group");
            this.bossGroup.shutdownGracefully(0L, (long)timeout, TimeUnit.MILLISECONDS).awaitUninterruptibly((long)timeout);
            LOG.trace("Boss group shut down");
            LOG.trace("Shutting down worker group");
            this.workerGroup.shutdownGracefully(0L, (long)timeout, TimeUnit.MILLISECONDS).awaitUninterruptibly((long)timeout);
            LOG.trace("Worker group shut down");
        }
    }

    @Override
    public void close() throws InterruptedException {
        this.stop();
    }

    @Override
    public int getServerPort() {
        if (!this.started.get()) {
            throw new IllegalStateException("Cannot get server port of non-started server");
        }
        return this.options.getServerPort();
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
        if (response.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer((CharSequence)response.status().toString(), (Charset)StandardCharsets.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength((HttpMessage)response, (long)response.content().readableBytes());
        }
        ChannelFuture f = ctx.channel().writeAndFlush((Object)response);
        if (!HttpUtil.isKeepAlive((HttpMessage)request) || response.status().code() != 200) {
            f.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    protected SslHandler getSslHandler() {
        return this.sslHandler;
    }

    private class ServerWSCompressionObserver
    extends ChannelDuplexHandler {
        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
        final String WS_UPGRADE = "upgrade";

        private ServerWSCompressionObserver() {
        }

        public void channelRead(ChannelHandlerContext ctx, Object message) {
            FullHttpRequest request;
            HttpHeaders headers;
            if (message instanceof FullHttpRequest && (headers = (request = (FullHttpRequest)message).headers()).contains("upgrade") && headers.contains("sec-websocket-extensions")) {
                Netty4Server.this.wsCompressionRequest = headers.get("sec-websocket-extensions").contains("permessage-deflate");
            }
            ctx.fireChannelRead(message);
        }

        public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) throws Exception {
            FullHttpResponse response;
            HttpHeaders headers;
            if (message instanceof FullHttpResponse && (headers = (response = (FullHttpResponse)message).headers()).contains("upgrade") && headers.contains("sec-websocket-extensions")) {
                Netty4Server.this.wsCompressionResponse = headers.get("sec-websocket-extensions").contains("permessage-deflate");
            }
            context.write(message, promise);
        }
    }

    private class NettyServerInboundHandler
    extends ChannelInboundHandlerAdapter {
        private NettyServerInboundHandler() {
        }

        public void userEventTriggered(ChannelHandlerContext context, Object payload) {
            if (payload instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
                Netty4Server.this.handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete)payload;
                Netty4Server.this.handshakeCompletion.countDown();
            }
        }

        public void channelActive(final ChannelHandlerContext ctx) {
            LOG.info("NettyServerHandler -> New active channel: {}", (Object)ctx.channel());
            SslHandler handler = (SslHandler)ctx.pipeline().get(SslHandler.class);
            if (handler != null) {
                handler.handshakeFuture().addListener((GenericFutureListener)new GenericFutureListener<Future<Channel>>(){

                    public void operationComplete(Future<Channel> future) throws Exception {
                        LOG.info("Server -> SSL handshake completed. Succeeded: {}", (Object)future.isSuccess());
                        if (!future.isSuccess()) {
                            ctx.close();
                        }
                    }
                });
            }
            ctx.fireChannelActive();
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            LOG.info("NettyServerHandler: channel has gone inactive: {}", (Object)ctx.channel());
            ctx.close();
            ctx.fireChannelInactive();
            Netty4Server.this.clientChannel = null;
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            LOG.trace("NettyServerHandler: Channel read: {}", msg);
            if (msg instanceof WebSocketFrame) {
                WebSocketFrame frame = (WebSocketFrame)msg;
                ctx.fireChannelRead((Object)frame.content());
            } else if (msg instanceof FullHttpRequest) {
                FullHttpRequest request = (FullHttpRequest)msg;
                Netty4Server.sendHttpResponse(ctx, request, (FullHttpResponse)new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            } else {
                ctx.fireChannelRead(msg);
            }
        }

        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            LOG.info("NettyServerHandler: NettyServerHandlerException caught on channel: {}", (Object)ctx.channel());
            cause.printStackTrace();
            ctx.close();
        }
    }

    private class NettyServerOutboundHandler
    extends ChannelOutboundHandlerAdapter {
        private NettyServerOutboundHandler() {
        }

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            LOG.trace("NettyServerHandler: Channel write: {}", msg);
            if (Netty4Server.this.isWebSocketServer() && msg instanceof ByteBuf) {
                if (Netty4Server.this.options.isFragmentWrites()) {
                    ByteBuf orig = (ByteBuf)msg;
                    int origIndex = orig.readerIndex();
                    int split = orig.readableBytes() / 2;
                    ByteBuf part1 = orig.copy(origIndex, split);
                    LOG.trace("NettyServerHandler: Part1: {}", (Object)part1);
                    orig.readerIndex(origIndex + split);
                    LOG.trace("NettyServerHandler: Part2: {}", (Object)orig);
                    BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
                    ctx.writeAndFlush((Object)frame1);
                    ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
                    ctx.write((Object)frame2, promise);
                } else {
                    BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf)msg);
                    ctx.write((Object)frame, promise);
                }
            } else {
                ctx.write(msg, promise);
            }
        }
    }
}

