/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.protonj2.test.driver.netty.netty4;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions;
import org.apache.qpid.protonj2.test.driver.netty.NettyClient;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
import org.apache.qpid.protonj2.test.driver.netty.netty4.Netty4EventLoop;
import org.apache.qpid.protonj2.test.driver.netty.netty4.SslSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Netty4Client
implements NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String AMQP_SUB_PROTOCOL = "amqp";
    private static final int SHUTDOWN_TIMEOUT = 50;
    private Netty4EventLoop eventLoop;
    private Bootstrap bootstrap;
    private EventLoopGroup group;
    private Channel channel;
    private String host;
    private int port;
    private boolean wsCompressionRequest;
    private boolean wsCompressionResponse;
    protected volatile IOException failureCause;
    private final ProtonTestClientOptions options;
    private volatile SslHandler sslHandler;
    protected final AtomicBoolean connected = new AtomicBoolean();
    protected final AtomicBoolean closed = new AtomicBoolean();
    protected final CountDownLatch connectedLatch = new CountDownLatch(1);
    private final Consumer<ByteBuffer> inputConsumer;
    private final Runnable connectedRunnable;

    public Netty4Client(ProtonTestClientOptions options, Runnable connectedRunnable, Consumer<ByteBuffer> inputConsumer) {
        Objects.requireNonNull(options);
        Objects.requireNonNull(inputConsumer);
        Objects.requireNonNull(connectedRunnable);
        this.options = options;
        this.connectedRunnable = connectedRunnable;
        this.inputConsumer = inputConsumer;
    }

    @Override
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            this.connected.set(false);
            this.connectedLatch.countDown();
            if (this.channel != null) {
                try {
                    if (!this.channel.close().await(10L, TimeUnit.SECONDS)) {
                        LOG.info("Channel close timed out waiting for result");
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    LOG.debug("Close of channel interrupted while awaiting result");
                }
            }
            if (this.group != null && !this.group.isShutdown()) {
                this.group.shutdownGracefully(0L, 50L, TimeUnit.MILLISECONDS);
                try {
                    if (this.eventLoop != null && this.eventLoop.inEventLoop()) {
                        return;
                    }
                    if (!this.group.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                        LOG.trace("Connection IO Event Loop shutdown failed to complete in allotted time");
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    LOG.debug("Shutdown of netty event loop interrupted while awaiting result");
                }
            }
        }
    }

    @Override
    public void connect(String host, int port) throws IOException {
        if (this.closed.get()) {
            throw new IllegalStateException("Netty client has already been closed");
        }
        if (host == null || host.isEmpty()) {
            throw new IllegalArgumentException("Transport host value cannot be null");
        }
        this.host = host;
        this.port = port > 0 ? port : (this.options.isSecure() ? 5671 : 5672);
        this.group = new NioEventLoopGroup(1);
        this.bootstrap = (Bootstrap)((Bootstrap)new Bootstrap().channel(NioSocketChannel.class)).group(this.group);
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<Channel>(){

            public void initChannel(Channel transportChannel) throws Exception {
                Netty4Client.this.channel = transportChannel;
                Netty4Client.this.eventLoop = new Netty4EventLoop(Netty4Client.this.channel.eventLoop());
                Netty4Client.this.configureChannel(transportChannel);
            }
        });
        this.configureNetty(this.bootstrap, this.options);
        this.bootstrap.connect(host, port).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        try {
            this.connectedLatch.await();
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
        if (!this.connected.get()) {
            if (this.failureCause != null) {
                throw this.failureCause;
            }
            throw new IOException("Netty client was closed before a connection was established.");
        }
    }

    @Override
    public NettyEventLoop eventLoop() {
        if (this.channel == null || !this.channel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        return this.eventLoop;
    }

    @Override
    public void write(ByteBuffer buffer) {
        if (this.channel == null || !this.channel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        this.channel.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer)buffer).asReadOnly());
    }

    @Override
    public boolean isConnected() {
        return this.connected.get();
    }

    @Override
    public boolean isSecure() {
        return this.options.isSecure();
    }

    @Override
    public boolean isWSCompressionActive() {
        if (this.channel == null || !this.channel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        return this.wsCompressionRequest && this.wsCompressionResponse;
    }

    @Override
    public URI getRemoteURI() {
        if (this.host != null) {
            try {
                if (this.options.isUseWebSockets()) {
                    return new URI(this.options.isSecure() ? "wss" : "ws", null, this.host, this.port, this.options.getWebSocketPath(), null, null);
                }
                return new URI(this.options.isSecure() ? "ssl" : "tcp", null, this.host, this.port, null, null, null);
            }
            catch (URISyntaxException uRISyntaxException) {
                // empty catch block
            }
        }
        return null;
    }

    protected ChannelHandler getClientHandler() {
        return new SimpleChannelInboundHandler<ByteBuf>(){

            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
                LOG.trace("AMQP Test Client Channel read: {}", (Object)input);
                try {
                    ByteBuffer copy = ByteBuffer.allocate(input.readableBytes());
                    input.readBytes(copy);
                    Netty4Client.this.inputConsumer.accept(copy.flip().asReadOnlyBuffer());
                }
                catch (Throwable e) {
                    LOG.error("Closed AMQP Test client channel due to error: ", e);
                    ctx.channel().close();
                }
            }
        };
    }

    protected EventLoop getEventLoop() {
        if (this.channel == null || !this.channel.isActive()) {
            throw new IllegalStateException("Channel is not connected or has closed");
        }
        return this.channel.eventLoop();
    }

    protected SslHandler getSslHandler() {
        return this.sslHandler;
    }

    private void configureChannel(Channel channel) throws Exception {
        if (this.isSecure()) {
            SslHandler sslHandler;
            try {
                sslHandler = SslSupport.createClientSslHandler(this.getRemoteURI(), this.options);
            }
            catch (Exception ex) {
                LOG.warn("Error during initialization of channel from SSL Handler creation:");
                this.handleTransportFailure(channel, ex);
                throw new IOException(ex);
            }
            channel.pipeline().addLast("ssl", (ChannelHandler)sslHandler);
        }
        if (this.options.isTraceBytes()) {
            channel.pipeline().addLast("logger", (ChannelHandler)new LoggingHandler(this.getClass()));
        }
        if (this.options.isUseWebSockets()) {
            channel.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec()});
            channel.pipeline().addLast(new ChannelHandler[]{new HttpObjectAggregator(8192)});
            if (this.options.isWebSocketCompression()) {
                channel.pipeline().addLast(new ChannelHandler[]{new ClientWSCompressionObserver()});
                channel.pipeline().addLast(new ChannelHandler[]{WebSocketClientCompressionHandler.INSTANCE});
            }
        }
        channel.pipeline().addLast(new ChannelHandler[]{new NettyClientOutboundHandler()});
        channel.pipeline().addLast(new ChannelHandler[]{new NettyClientInboundHandler()});
        channel.pipeline().addLast(new ChannelHandler[]{this.getClientHandler()});
    }

    private void configureNetty(Bootstrap bootstrap, ProtonTestClientOptions options) {
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)options.isTcpNoDelay());
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)options.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)options.isTcpKeepAlive());
        bootstrap.option(ChannelOption.SO_LINGER, (Object)options.getSoLinger());
        if (options.getSendBufferSize() != -1) {
            bootstrap.option(ChannelOption.SO_SNDBUF, (Object)options.getSendBufferSize());
        }
        if (options.getReceiveBufferSize() != -1) {
            bootstrap.option(ChannelOption.SO_RCVBUF, (Object)options.getReceiveBufferSize());
        }
        if (options.getTrafficClass() != -1) {
            bootstrap.option(ChannelOption.IP_TOS, (Object)options.getTrafficClass());
        }
        if (options.getLocalAddress() != null || options.getLocalPort() != 0) {
            if (options.getLocalAddress() != null) {
                bootstrap.localAddress(options.getLocalAddress(), options.getLocalPort());
            } else {
                bootstrap.localAddress(options.getLocalPort());
            }
        }
    }

    protected void handleConnected(Channel connectedChannel) {
        LOG.trace("Channel has become active! Channel is {}", (Object)connectedChannel);
        this.channel = connectedChannel;
        this.connected.set(true);
        this.connectedLatch.countDown();
        this.connectedRunnable.run();
    }

    protected void handleTransportFailure(Channel failedChannel, Throwable cause) {
        if (!this.closed.get()) {
            LOG.trace("Channel indicates connection failure! Channel is {}", (Object)failedChannel);
            this.failureCause = new IOException(cause);
            this.channel = failedChannel;
            this.connected.set(false);
            this.connectedLatch.countDown();
        } else {
            LOG.trace("Closed Channel signaled that the channel ended: {}", (Object)this.channel);
        }
    }

    private class ClientWSCompressionObserver
    extends ChannelDuplexHandler {
        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
        final String WS_UPGRADE = "upgrade";

        private ClientWSCompressionObserver() {
        }

        public void channelRead(ChannelHandlerContext ctx, Object message) {
            FullHttpResponse request;
            HttpHeaders headers;
            if (message instanceof FullHttpResponse && (headers = (request = (FullHttpResponse)message).headers()).contains("upgrade") && headers.contains("sec-websocket-extensions")) {
                Netty4Client.this.wsCompressionRequest = headers.get("sec-websocket-extensions").contains("permessage-deflate");
            }
            ctx.fireChannelRead(message);
        }

        public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) throws Exception {
            FullHttpRequest response;
            HttpHeaders headers;
            if (message instanceof FullHttpRequest && (headers = (response = (FullHttpRequest)message).headers()).contains("upgrade") && headers.contains("sec-websocket-extensions")) {
                Netty4Client.this.wsCompressionResponse = headers.get("sec-websocket-extensions").contains("permessage-deflate");
            }
            context.write(message, promise);
        }
    }

    private class NettyClientOutboundHandler
    extends ChannelOutboundHandlerAdapter {
        private NettyClientOutboundHandler() {
        }

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            LOG.trace("NettyServerHandler: Channel write: {}", msg);
            if (Netty4Client.this.options.isUseWebSockets() && msg instanceof ByteBuf) {
                if (Netty4Client.this.options.isFragmentWrites()) {
                    ByteBuf orig = (ByteBuf)msg;
                    int origIndex = orig.readerIndex();
                    int split = orig.readableBytes() / 2;
                    ByteBuf part1 = orig.copy(origIndex, split);
                    LOG.trace("NettyClientOutboundHandler: Part1: {}", (Object)part1);
                    orig.readerIndex(origIndex + split);
                    LOG.trace("NettyClientOutboundHandler: Part2: {}", (Object)orig);
                    BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
                    ctx.writeAndFlush((Object)frame1);
                    ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
                    ctx.write((Object)frame2, promise);
                } else {
                    BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf)msg);
                    ctx.write((Object)frame, promise);
                }
            } else {
                ctx.write(msg, promise);
            }
        }
    }

    protected class NettyClientInboundHandler
    extends ChannelInboundHandlerAdapter {
        private final WebSocketClientHandshaker handshaker;
        private ScheduledFuture<?> handshakeTimeoutFuture;

        public NettyClientInboundHandler() {
            if (Netty4Client.this.options.isUseWebSockets()) {
                DefaultHttpHeaders headers = new DefaultHttpHeaders();
                Netty4Client.this.options.getHttpHeaders().forEach((key, value) -> headers.set(key, value));
                this.handshaker = WebSocketClientHandshakerFactory.newHandshaker((URI)Netty4Client.this.getRemoteURI(), (WebSocketVersion)WebSocketVersion.V13, (String)Netty4Client.AMQP_SUB_PROTOCOL, (boolean)true, (HttpHeaders)headers, (int)Netty4Client.this.options.getWebSocketMaxFrameSize());
            } else {
                this.handshaker = null;
            }
        }

        public final void channelRegistered(ChannelHandlerContext context) throws Exception {
            Netty4Client.this.channel = context.channel();
        }

        public void channelActive(ChannelHandlerContext context) throws Exception {
            if (Netty4Client.this.options.isUseWebSockets()) {
                this.handshaker.handshake(context.channel());
                this.handshakeTimeoutFuture = context.executor().schedule(() -> {
                    LOG.trace("WebSocket handshake timed out! Channel is {}", (Object)context.channel());
                    if (!this.handshaker.isHandshakeComplete()) {
                        Netty4Client.this.handleTransportFailure(Netty4Client.this.channel, new IOException("WebSocket handshake timed out"));
                    }
                }, (long)Netty4Client.this.options.getConnectTimeout(), TimeUnit.MILLISECONDS);
            }
            if (!Netty4Client.this.isSecure()) {
                if (!Netty4Client.this.options.isUseWebSockets()) {
                    Netty4Client.this.handleConnected(context.channel());
                    context.fireChannelActive();
                }
            } else {
                SslHandler sslHandler = (SslHandler)context.pipeline().get(SslHandler.class);
                sslHandler.handshakeFuture().addListener((GenericFutureListener)new GenericFutureListener<Future<Channel>>(){

                    public void operationComplete(Future<Channel> future) throws Exception {
                        if (future.isSuccess()) {
                            LOG.trace("SSL Handshake has completed: {}", (Object)Netty4Client.this.channel);
                            if (!Netty4Client.this.options.isUseWebSockets()) {
                                Netty4Client.this.handleConnected(Netty4Client.this.channel);
                            }
                        } else {
                            LOG.trace("SSL Handshake has failed: {}", (Object)Netty4Client.this.channel);
                            Netty4Client.this.handleTransportFailure(Netty4Client.this.channel, future.cause());
                        }
                    }
                });
            }
        }

        public void channelInactive(ChannelHandlerContext context) throws Exception {
            if (this.handshakeTimeoutFuture != null) {
                this.handshakeTimeoutFuture.cancel(false);
            }
            Netty4Client.this.handleTransportFailure(context.channel(), new IOException("Remote closed connection unexpectedly"));
        }

        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
            Netty4Client.this.handleTransportFailure(context.channel(), cause);
        }

        public void channelRead(ChannelHandlerContext ctx, Object message) {
            if (Netty4Client.this.options.isUseWebSockets()) {
                LOG.trace("New data read: incoming: {}", message);
                Channel ch = ctx.channel();
                if (!this.handshaker.isHandshakeComplete()) {
                    this.handshaker.finishHandshake(ch, (FullHttpResponse)message);
                    LOG.trace("WebSocket Client connected! {}", (Object)ctx.channel());
                    if (this.handshakeTimeoutFuture.cancel(false)) {
                        Netty4Client.this.handleConnected(ch);
                    }
                    return;
                }
                if (message instanceof FullHttpResponse) {
                    FullHttpResponse response = (FullHttpResponse)message;
                    throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + String.valueOf(response.status()) + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ")");
                }
                WebSocketFrame frame = (WebSocketFrame)message;
                if (frame instanceof TextWebSocketFrame) {
                    TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                    LOG.warn("WebSocket Client received message: " + textFrame.text());
                    ctx.fireExceptionCaught((Throwable)new IOException("Received invalid frame over WebSocket."));
                } else if (frame instanceof BinaryWebSocketFrame) {
                    BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame)frame;
                    LOG.trace("WebSocket Client received data: {} bytes", (Object)binaryFrame.content().readableBytes());
                    ctx.fireChannelRead((Object)binaryFrame.content());
                } else if (frame instanceof ContinuationWebSocketFrame) {
                    ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame)frame;
                    LOG.trace("WebSocket Client received data continuation: {} bytes", (Object)continuationFrame.content().readableBytes());
                    ctx.fireChannelRead((Object)continuationFrame.content());
                } else if (frame instanceof PingWebSocketFrame) {
                    LOG.trace("WebSocket Client received ping, response with pong");
                    ch.write((Object)new PongWebSocketFrame(frame.content()));
                } else if (frame instanceof CloseWebSocketFrame) {
                    LOG.trace("WebSocket Client received closing");
                    ch.close();
                }
            } else {
                ctx.fireChannelRead(message);
            }
        }
    }
}

