package org.apache.flink.runtime.rest;

import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestClient.class */
public class RestClient {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
    private final Executor executor;
    private final Bootstrap bootstrap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$ClientHandler.class */
    public static class ClientHandler extends SimpleChannelInboundHandler<Object> {
        private final CompletableFuture<JsonResponse> jsonFuture;

        private ClientHandler() {
            this.jsonFuture = new CompletableFuture<>();
        }

        CompletableFuture<JsonResponse> getJsonFuture() {
            return this.jsonFuture;
        }

        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
            if (obj instanceof FullHttpResponse) {
                readRawResponse((FullHttpResponse) obj);
            } else {
                RestClient.LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
                if (obj instanceof HttpResponse) {
                    this.jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", ((HttpResponse) obj).getStatus()));
                } else {
                    this.jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
                }
            }
            channelHandlerContext.close();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            if (th instanceof TooLongFrameException) {
                this.jsonFuture.completeExceptionally(new TooLongFrameException(String.format(th.getMessage() + " Try to raise [%s]", RestOptions.CLIENT_MAX_CONTENT_LENGTH.key())));
            } else {
                this.jsonFuture.completeExceptionally(th);
            }
            channelHandlerContext.close();
        }

        private void readRawResponse(FullHttpResponse fullHttpResponse) {
            ByteBuf content = fullHttpResponse.content();
            try {
                ByteBufInputStream byteBufInputStream = new ByteBufInputStream(content);
                Throwable th = null;
                try {
                    try {
                        JsonNode readTree = RestClient.objectMapper.readTree(byteBufInputStream);
                        RestClient.LOG.debug("Received response {}.", readTree);
                        if (byteBufInputStream != null) {
                            if (0 != 0) {
                                try {
                                    byteBufInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                byteBufInputStream.close();
                            }
                        }
                        this.jsonFuture.complete(new JsonResponse(readTree, fullHttpResponse.getStatus()));
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (byteBufInputStream != null) {
                        if (th != null) {
                            try {
                                byteBufInputStream.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            byteBufInputStream.close();
                        }
                    }
                    throw th4;
                }
            } catch (IOException e) {
                RestClient.LOG.error("Response could not be read.", e);
                this.jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", e, fullHttpResponse.getStatus()));
            } catch (JsonProcessingException e2) {
                RestClient.LOG.error("Response was not valid JSON.", e2);
                content.readerIndex(0);
                try {
                    ByteBufInputStream byteBufInputStream2 = new ByteBufInputStream(content);
                    Throwable th6 = null;
                    try {
                        try {
                            byte[] bArr = new byte[byteBufInputStream2.available()];
                            byteBufInputStream2.readFully(bArr);
                            String str = new String(bArr);
                            RestClient.LOG.error("Unexpected plain-text response: {}", str);
                            this.jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, but plain-text: " + str, e2, fullHttpResponse.getStatus()));
                            if (byteBufInputStream2 != null) {
                                if (0 != 0) {
                                    try {
                                        byteBufInputStream2.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    byteBufInputStream2.close();
                                }
                            }
                        } catch (Throwable th8) {
                            th6 = th8;
                            throw th8;
                        }
                    } finally {
                    }
                } catch (IOException e3) {
                    this.jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, nor plain-text.", e2, fullHttpResponse.getStatus()));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$JsonResponse.class */
    public static final class JsonResponse {
        private final JsonNode json;
        private final HttpResponseStatus httpResponseStatus;

        private JsonResponse(JsonNode jsonNode, HttpResponseStatus httpResponseStatus) {
            this.json = (JsonNode) Preconditions.checkNotNull(jsonNode);
            this.httpResponseStatus = (HttpResponseStatus) Preconditions.checkNotNull(httpResponseStatus);
        }

        public JsonNode getJson() {
            return this.json;
        }

        public HttpResponseStatus getHttpResponseStatus() {
            return this.httpResponseStatus;
        }

        public String toString() {
            return "JsonResponse{json=" + this.json + ", httpResponseStatus=" + this.httpResponseStatus + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$MultipartRequest.class */
    public static final class MultipartRequest implements Request {
        private final HttpRequest httpRequest;
        private final HttpPostRequestEncoder bodyRequestEncoder;

        MultipartRequest(HttpRequest httpRequest, HttpPostRequestEncoder httpPostRequestEncoder) {
            this.httpRequest = httpRequest;
            this.bodyRequestEncoder = httpPostRequestEncoder;
        }

        @Override // org.apache.flink.runtime.rest.RestClient.Request
        public void writeTo(Channel channel) {
            ChannelFuture writeAndFlush = channel.writeAndFlush(this.httpRequest);
            if (this.bodyRequestEncoder.isChunked()) {
                writeAndFlush = channel.writeAndFlush(this.bodyRequestEncoder);
            }
            writeAndFlush.addListener(future -> {
                this.bodyRequestEncoder.cleanFiles();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$Request.class */
    public interface Request {
        void writeTo(Channel channel) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$SimpleRequest.class */
    public static final class SimpleRequest implements Request {
        private final HttpRequest httpRequest;

        SimpleRequest(HttpRequest httpRequest) {
            this.httpRequest = httpRequest;
        }

        @Override // org.apache.flink.runtime.rest.RestClient.Request
        public void writeTo(Channel channel) {
            channel.writeAndFlush(this.httpRequest);
        }
    }

    public RestClient(final RestClientConfiguration restClientConfiguration, Executor executor) {
        Preconditions.checkNotNull(restClientConfiguration);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        final SSLEngineFactory sslEngineFactory = restClientConfiguration.getSslEngineFactory();
        ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.rest.RestClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) {
                if (sslEngineFactory != null) {
                    socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
                }
                socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(restClientConfiguration.getMaxContentLength())}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(new ChannelHandler[]{new ClientHandler()});
            }
        };
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-client-netty"));
        this.bootstrap = new Bootstrap();
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(restClientConfiguration.getConnectionTimeout()))).group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(channelInitializer);
        LOG.info("Rest client endpoint started.");
    }

    public void shutdown(Time time) {
        LOG.info("Shutting down rest endpoint.");
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.bootstrap != null && this.bootstrap.group() != null) {
            this.bootstrap.group().shutdownGracefully(0L, time.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(future -> {
                if (future.isSuccess()) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(future.cause());
                }
            });
        }
        try {
            completableFuture.get(time.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        } catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", e);
        }
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, U u, R r) throws IOException {
        return sendRequest(str, i, m, u, r, Collections.emptyList());
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, U u, R r, Collection<FileUpload> collection) throws IOException {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(0 <= i && i < 65536, "The target port " + i + " is not in the range (0, 65536].");
        Preconditions.checkNotNull(m);
        Preconditions.checkNotNull(r);
        Preconditions.checkNotNull(u);
        Preconditions.checkNotNull(collection);
        Preconditions.checkState(u.isResolved(), "Message parameters were not resolved.");
        String resolveUrl = MessageParameters.resolveUrl(m.getTargetRestEndpointURL(), u);
        LOG.debug("Sending request of class {} to {}:{}{}", new Object[]{r.getClass(), str, Integer.valueOf(i), resolveUrl});
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, r);
        Request createRequest = createRequest(str + ':' + i, resolveUrl, m.getHttpMethod().getNettyHttpMethod(), Unpooled.wrappedBuffer(stringWriter.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)), collection);
        Collection<Class<?>> responseTypeParameters = m.getResponseTypeParameters();
        return submitRequest(str, i, createRequest, responseTypeParameters.isEmpty() ? objectMapper.constructType(m.getResponseClass()) : objectMapper.getTypeFactory().constructParametricType(m.getResponseClass(), (Class[]) responseTypeParameters.toArray(new Class[responseTypeParameters.size()])));
    }

    private static Request createRequest(String str, String str2, HttpMethod httpMethod, ByteBuf byteBuf, Collection<FileUpload> collection) throws IOException {
        if (collection.isEmpty()) {
            DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, str2, byteBuf);
            defaultFullHttpRequest.headers().set("Host", str).set("Connection", "close").add("Content-Length", Integer.valueOf(byteBuf.capacity())).add("Content-Type", RestConstants.REST_CONTENT_TYPE);
            return new SimpleRequest(defaultFullHttpRequest);
        }
        DefaultFullHttpRequest defaultFullHttpRequest2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, str2);
        defaultFullHttpRequest2.headers().set("Host", str).set("Connection", "close");
        try {
            HttpPostRequestEncoder httpPostRequestEncoder = new HttpPostRequestEncoder(new DefaultHttpDataFactory(true), defaultFullHttpRequest2, true);
            MemoryAttribute memoryAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
            memoryAttribute.setContent(byteBuf);
            httpPostRequestEncoder.addBodyHttpData(memoryAttribute);
            int i = 0;
            for (FileUpload fileUpload : collection) {
                Path file = fileUpload.getFile();
                if (Files.isDirectory(file, new LinkOption[0])) {
                    throw new IllegalArgumentException("Upload of directories is not supported. Dir=" + file);
                }
                File file2 = file.toFile();
                LOG.trace("Adding file {} to request.", file2);
                httpPostRequestEncoder.addBodyFileUpload("file_" + i, file2, fileUpload.getContentType(), false);
                i++;
            }
            try {
                return new MultipartRequest(httpPostRequestEncoder.finalizeRequest(), httpPostRequestEncoder);
            } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
                throw new IOException("Could not finalize request.", e);
            }
        } catch (HttpPostRequestEncoder.ErrorDataEncoderException e2) {
            throw new IOException("Could not encode request.", e2);
        }
    }

    private <P extends ResponseBody> CompletableFuture<P> submitRequest(String str, int i, Request request, JavaType javaType) {
        ChannelFuture connect = this.bootstrap.connect(str, i);
        CompletableFuture completableFuture = new CompletableFuture();
        connect.addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                completableFuture.complete(channelFuture.channel());
            } else {
                completableFuture.completeExceptionally(channelFuture.cause());
            }
        });
        return completableFuture.thenComposeAsync(channel -> {
            CompletableFuture<JsonResponse> jsonFuture = channel.pipeline().get(ClientHandler.class).getJsonFuture();
            try {
                request.writeTo(channel);
                return jsonFuture;
            } catch (IOException e) {
                return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
            }
        }, this.executor).thenComposeAsync(jsonResponse -> {
            return parseResponse(jsonResponse, javaType);
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse jsonResponse, JavaType javaType) {
        FutureUtils.WaitingConjunctFuture waitingConjunctFuture = (CompletableFuture<P>) new CompletableFuture();
        try {
            waitingConjunctFuture.complete((ResponseBody) objectMapper.readValue(objectMapper.treeAsTokens(jsonResponse.json), javaType));
        } catch (IOException e) {
            try {
                waitingConjunctFuture.completeExceptionally(new RestClientException(((ErrorResponseBody) objectMapper.treeToValue(jsonResponse.getJson(), ErrorResponseBody.class)).errors.toString(), jsonResponse.getHttpResponseStatus()));
            } catch (JsonProcessingException e2) {
                LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", new Object[]{javaType, jsonResponse, e2});
                waitingConjunctFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + javaType + ") nor an error.", e, jsonResponse.getHttpResponseStatus()));
            }
        }
        return waitingConjunctFuture;
    }
}
