/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.http;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeader;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.WrongMethodTypeException;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.http.client.HttpClientState;
import reactor.netty.http.client.HttpResponseDecoderSpec;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.transport.ProxyProvider;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public class ReactorNettyClient
implements HttpClient {
    private static final String REACTOR_NETTY_REQUEST_RECORD_KEY = "reactorNettyRequestRecordKey";
    private static final Logger logger = LoggerFactory.getLogger((String)ReactorNettyClient.class.getSimpleName());
    private static final MethodHandle HTTP_CLIENT_WARMUP;
    private HttpClientConfig httpClientConfig;
    private reactor.netty.http.client.HttpClient httpClient;
    private ConnectionProvider connectionProvider;

    private ReactorNettyClient() {
    }

    public static ReactorNettyClient create(HttpClientConfig httpClientConfig) {
        ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
        reactorNettyClient.httpClientConfig = httpClientConfig;
        reactorNettyClient.httpClient = (reactor.netty.http.client.HttpClient)reactor.netty.http.client.HttpClient.newConnection().observe(ReactorNettyClient.getConnectionObserver()).resolver((AddressResolverGroup)DefaultAddressResolverGroup.INSTANCE);
        reactorNettyClient.configureChannelPipelineHandlers();
        ReactorNettyClient.attemptToWarmupHttpClient(reactorNettyClient);
        return reactorNettyClient;
    }

    public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) {
        ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
        reactorNettyClient.connectionProvider = connectionProvider;
        reactorNettyClient.httpClientConfig = httpClientConfig;
        reactorNettyClient.httpClient = (reactor.netty.http.client.HttpClient)reactor.netty.http.client.HttpClient.create((ConnectionProvider)connectionProvider).observe(ReactorNettyClient.getConnectionObserver()).resolver((AddressResolverGroup)DefaultAddressResolverGroup.INSTANCE);
        reactorNettyClient.configureChannelPipelineHandlers();
        ReactorNettyClient.attemptToWarmupHttpClient(reactorNettyClient);
        return reactorNettyClient;
    }

    private static void attemptToWarmupHttpClient(ReactorNettyClient reactorNettyClient) {
        if (HTTP_CLIENT_WARMUP == null) {
            return;
        }
        try {
            HTTP_CLIENT_WARMUP.invoke(reactorNettyClient.httpClient).block();
        }
        catch (ClassCastException | WrongMethodTypeException throwable) {
            logger.debug("Invoking HttpClient.warmup failed.", (Throwable)throwable);
        }
        catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
    }

    private void configureChannelPipelineHandlers() {
        Configs configs = this.httpClientConfig.getConfigs();
        if (this.httpClientConfig.getProxy() != null) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy().getAddress()).username(this.httpClientConfig.getProxy().getUsername()).password(userName -> this.httpClientConfig.getProxy().getPassword()));
        }
        if (LoggerFactory.getLogger((String)"com.azure.cosmos.netty-network").isTraceEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.INFO);
        }
        this.httpClient = ((reactor.netty.http.client.HttpClient)this.httpClient.secure(sslContextSpec -> sslContextSpec.sslContext(configs.getSslContext())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)configs.getConnectionAcquireTimeout().toMillis()))).httpResponseDecoder(httpResponseDecoderSpec -> (HttpResponseDecoderSpec)((HttpResponseDecoderSpec)((HttpResponseDecoderSpec)((HttpResponseDecoderSpec)httpResponseDecoderSpec.maxInitialLineLength(configs.getMaxHttpInitialLineLength())).maxHeaderSize(configs.getMaxHttpHeaderSize())).maxChunkSize(configs.getMaxHttpChunkSize())).validateHeaders(true));
    }

    @Override
    public Mono<HttpResponse> send(HttpRequest request) {
        return this.send(request, Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds()));
    }

    @Override
    public Mono<HttpResponse> send(HttpRequest request, Duration responseTimeout) {
        Objects.requireNonNull(request.httpMethod());
        Objects.requireNonNull(request.uri());
        Objects.requireNonNull(this.httpClientConfig);
        if (request.reactorNettyRequestRecord() == null) {
            ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
            reactorNettyRequestRecord.setTimeCreated(Instant.now());
            request.withReactorNettyRequestRecord(reactorNettyRequestRecord);
        }
        AtomicReference responseReference = new AtomicReference();
        return ((HttpClient.RequestSender)this.httpClient.keepAlive(this.httpClientConfig.isConnectionKeepAlive()).port(request.port()).responseTimeout(responseTimeout).request(HttpMethod.valueOf((String)request.httpMethod().toString())).uri(request.uri().toASCIIString())).send(ReactorNettyClient.bodySendDelegate(request)).responseConnection((reactorNettyResponse, reactorNettyConnection) -> {
            HttpResponse httpResponse = new ReactorNettyHttpResponse((HttpClientResponse)reactorNettyResponse, (Connection)reactorNettyConnection).withRequest(request);
            responseReference.set((ReactorNettyHttpResponse)httpResponse);
            return Mono.just((Object)httpResponse);
        }).contextWrite((ContextView)Context.of((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, (Object)request.reactorNettyRequestRecord())).doOnCancel(() -> {
            ReactorNettyHttpResponse reactorNettyHttpResponse = (ReactorNettyHttpResponse)responseReference.get();
            if (reactorNettyHttpResponse != null) {
                reactorNettyHttpResponse.releaseOnNotSubscribedResponse(ReactorNettyResponseState.CANCELLED);
            }
        }).onErrorMap(throwable -> {
            ReactorNettyHttpResponse reactorNettyHttpResponse = (ReactorNettyHttpResponse)responseReference.get();
            if (reactorNettyHttpResponse != null) {
                reactorNettyHttpResponse.releaseOnNotSubscribedResponse(ReactorNettyResponseState.ERROR);
            }
            return throwable;
        }).single();
    }

    private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(HttpRequest restRequest) {
        return (reactorNettyRequest, reactorNettyOutbound) -> {
            for (HttpHeader header : restRequest.headers()) {
                reactorNettyRequest.header((CharSequence)header.name(), (CharSequence)header.value());
            }
            if (restRequest.body() != null) {
                return reactorNettyOutbound.sendByteArray(restRequest.body());
            }
            return reactorNettyOutbound;
        };
    }

    @Override
    public void shutdown() {
        if (this.connectionProvider != null) {
            this.connectionProvider.dispose();
        }
    }

    private static ConnectionObserver getConnectionObserver() {
        return (conn, state) -> {
            Instant time = Instant.now();
            if (state.equals(HttpClientState.CONNECTED)) {
                if (conn instanceof ConnectionObserver) {
                    ConnectionObserver observer = (ConnectionObserver)conn;
                    ReactorNettyRequestRecord requestRecord = (ReactorNettyRequestRecord)observer.currentContext().getOrDefault((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, null);
                    if (requestRecord == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    requestRecord.setTimeConnected(time);
                }
            } else if (state.equals(HttpClientState.ACQUIRED)) {
                if (conn instanceof ConnectionObserver) {
                    ConnectionObserver observer = (ConnectionObserver)conn;
                    ReactorNettyRequestRecord requestRecord = (ReactorNettyRequestRecord)observer.currentContext().getOrDefault((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, null);
                    if (requestRecord == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    requestRecord.setTimeAcquired(time);
                }
            } else if (state.equals(HttpClientState.CONFIGURED)) {
                if (conn instanceof HttpClientRequest) {
                    HttpClientRequest httpClientRequest = (HttpClientRequest)conn;
                    ReactorNettyRequestRecord requestRecord = (ReactorNettyRequestRecord)httpClientRequest.currentContextView().getOrDefault((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, null);
                    if (requestRecord == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    requestRecord.setTimeConfigured(time);
                }
            } else if (state.equals(HttpClientState.REQUEST_SENT)) {
                if (conn instanceof HttpClientRequest) {
                    HttpClientRequest httpClientRequest = (HttpClientRequest)conn;
                    ReactorNettyRequestRecord requestRecord = (ReactorNettyRequestRecord)httpClientRequest.currentContextView().getOrDefault((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, null);
                    if (requestRecord == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    requestRecord.setTimeSent(time);
                }
            } else if (state.equals(HttpClientState.RESPONSE_RECEIVED) && conn instanceof HttpClientRequest) {
                HttpClientRequest httpClientRequest = (HttpClientRequest)conn;
                ReactorNettyRequestRecord requestRecord = (ReactorNettyRequestRecord)httpClientRequest.currentContextView().getOrDefault((Object)REACTOR_NETTY_REQUEST_RECORD_KEY, null);
                if (requestRecord == null) {
                    throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                }
                requestRecord.setTimeReceived(time);
            }
        };
    }

    public void enableNetworkLogging() {
        Logger logger = LoggerFactory.getLogger((String)"com.azure.cosmos.netty-network");
        if (logger.isTraceEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.TRACE);
        } else if (logger.isDebugEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.DEBUG);
        } else if (logger.isInfoEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.INFO);
        } else if (logger.isWarnEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.WARN);
        } else if (logger.isErrorEnabled()) {
            this.httpClient = (reactor.netty.http.client.HttpClient)this.httpClient.wiretap("com.azure.cosmos.netty-network", LogLevel.ERROR);
        }
    }

    static {
        MethodHandle httpClientWarmup = null;
        try {
            httpClientWarmup = MethodHandles.publicLookup().findVirtual(reactor.netty.http.client.HttpClient.class, "warmup", MethodType.methodType(Mono.class));
        }
        catch (IllegalAccessException | NoSuchMethodException reflectiveOperationException) {
            // empty catch block
        }
        HTTP_CLIENT_WARMUP = httpClientWarmup;
    }

    private static class ReactorNettyHttpResponse
    extends HttpResponse {
        private final AtomicReference<ReactorNettyResponseState> state = new AtomicReference<ReactorNettyResponseState>(ReactorNettyResponseState.NOT_SUBSCRIBED);
        private final HttpClientResponse reactorNettyResponse;
        private final Connection reactorNettyConnection;

        ReactorNettyHttpResponse(HttpClientResponse reactorNettyResponse, Connection reactorNettyConnection) {
            this.reactorNettyResponse = reactorNettyResponse;
            this.reactorNettyConnection = reactorNettyConnection;
        }

        @Override
        public int statusCode() {
            return this.reactorNettyResponse.status().code();
        }

        @Override
        public String headerValue(String name) {
            return this.reactorNettyResponse.responseHeaders().get(name);
        }

        @Override
        public HttpHeaders headers() {
            HttpHeaders headers = new HttpHeaders(this.reactorNettyResponse.responseHeaders().size());
            this.reactorNettyResponse.responseHeaders().forEach(e -> headers.set((String)e.getKey(), (String)e.getValue()));
            return headers;
        }

        @Override
        public Mono<ByteBuf> body() {
            return this.bodyIntern().aggregate().doOnSubscribe(this::updateSubscriptionState);
        }

        @Override
        public Mono<String> bodyAsString() {
            return this.bodyIntern().aggregate().asString().doOnSubscribe(this::updateSubscriptionState);
        }

        private ByteBufFlux bodyIntern() {
            return this.reactorNettyConnection.inbound().receive();
        }

        @Override
        Connection internConnection() {
            return this.reactorNettyConnection;
        }

        private void updateSubscriptionState(Subscription subscription) {
            if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.SUBSCRIBED)) {
                return;
            }
            if (this.state.get() == ReactorNettyResponseState.CANCELLED) {
                throw new IllegalStateException("The client response body has been released already due to cancellation.");
            }
        }

        private void releaseOnNotSubscribedResponse(ReactorNettyResponseState reactorNettyResponseState) {
            if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, reactorNettyResponseState)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Releasing body, not yet subscribed");
                }
                this.bodyIntern().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {});
            }
        }
    }

    private static enum ReactorNettyResponseState {
        NOT_SUBSCRIBED,
        SUBSCRIBED,
        CANCELLED,
        ERROR;

    }
}

