package org.apache.drill.exec.rpc;

import com.google.protobuf.Internal;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient.class */
public abstract class BasicClient<T extends Internal.EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends RpcBus<T, R> {
    private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5d;
    private final Bootstrap b;
    protected R connection;
    private final T handshakeType;
    private final Class<HANDSHAKE_RESPONSE> responseClass;
    private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
    private final BasicClient<T, R, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>.IdlePingHandler pingHandler;
    private static final Logger logger = LoggerFactory.getLogger(BasicClient.class);
    private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(GeneralRPCProtos.RpcMode.PING, 0, 0, (MessageLite) Acks.OK, new ByteBuf[0]);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient$ClientHandshakeHandler.class */
    public class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
        public ClientHandshakeHandler() {
            super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
        }

        @Override // org.apache.drill.exec.rpc.AbstractHandshakeHandler
        protected final void consumeHandshake(ChannelHandlerContext channelHandlerContext, HANDSHAKE_RESPONSE handshake_response) throws Exception {
            BasicClient.this.queue.getFuture(this.handshakeType.getNumber(), this.coordinationId, BasicClient.this.responseClass).set(handshake_response, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient$ConnectionMultiListener.class */
    public class ConnectionMultiListener {
        private final RpcConnectionHandler<R> l;
        private final HANDSHAKE_SEND handshakeValue;
        public final BasicClient<T, R, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>.ConnectionMultiListener.ConnectionHandler connectionHandler = new ConnectionHandler();
        public final BasicClient<T, R, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>.ConnectionMultiListener.HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
        static final /* synthetic */ boolean $assertionsDisabled;

        /* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient$ConnectionMultiListener$ConnectionHandler.class */
        private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
            private ConnectionHandler() {
            }

            /* JADX WARN: Multi-variable type inference failed */
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                boolean z = false;
                long j = 120000;
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    try {
                        channelFuture.get(j, TimeUnit.MILLISECONDS);
                        if (channelFuture.isSuccess()) {
                            BasicClient.this.setAddresses(channelFuture.channel().remoteAddress(), channelFuture.channel().localAddress());
                            BasicClient.this.send(ConnectionMultiListener.this.handshakeSendHandler, BasicClient.this.connection, BasicClient.this.handshakeType, ConnectionMultiListener.this.handshakeValue, BasicClient.this.responseClass, true, new ByteBuf[0]);
                        } else {
                            ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, new RpcException("General connection failure."));
                        }
                    } catch (InterruptedException e) {
                        j -= System.currentTimeMillis() - currentTimeMillis;
                        currentTimeMillis = System.currentTimeMillis();
                        z = true;
                        if (j < 1) {
                            ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, e);
                            break;
                        }
                    } catch (Exception e2) {
                        BasicClient.logger.error("Failed to establish connection", e2);
                        ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, e2);
                    }
                }
                if (z) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient$ConnectionMultiListener$HandshakeSendHandler.class */
        public class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
            private HandshakeSendHandler() {
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.drill.exec.rpc.RpcOutcomeListener
            public void failed(RpcException rpcException) {
                BasicClient.logger.debug("Failure while initiating handshake", rpcException);
                ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, rpcException);
            }

            @Override // org.apache.drill.exec.rpc.RpcOutcomeListener
            public void success(HANDSHAKE_RESPONSE handshake_response, ByteBuf byteBuf) {
                try {
                    BasicClient.this.validateHandshake(handshake_response);
                    BasicClient.this.finalizeConnection(handshake_response, BasicClient.this.connection);
                    ConnectionMultiListener.this.l.connectionSucceeded(BasicClient.this.connection);
                } catch (RpcException e) {
                    ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, e);
                }
            }

            @Override // org.apache.drill.exec.rpc.RpcOutcomeListener
            public void interrupted(InterruptedException interruptedException) {
                BasicClient.logger.warn("Interrupted while waiting for handshake response", interruptedException);
                ConnectionMultiListener.this.l.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, interruptedException);
            }
        }

        public ConnectionMultiListener(RpcConnectionHandler<R> rpcConnectionHandler, HANDSHAKE_SEND handshake_send) {
            if (!$assertionsDisabled && rpcConnectionHandler == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && handshake_send == null) {
                throw new AssertionError();
            }
            this.l = rpcConnectionHandler;
            this.handshakeValue = handshake_send;
        }

        static {
            $assertionsDisabled = !BasicClient.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/rpc/BasicClient$IdlePingHandler.class */
    public class IdlePingHandler extends IdleStateHandler {
        private GenericFutureListener<Future<? super Void>> pingFailedHandler;

        public IdlePingHandler(long j) {
            super(0L, j, 0L, TimeUnit.MILLISECONDS);
            this.pingFailedHandler = new GenericFutureListener<Future<? super Void>>() { // from class: org.apache.drill.exec.rpc.BasicClient.IdlePingHandler.1
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        return;
                    }
                    BasicClient.logger.error("Unable to maintain connection {}.  Closing connection.", BasicClient.this.connection.getName());
                    BasicClient.this.connection.close();
                }
            };
        }

        protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) throws Exception {
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                channelHandlerContext.writeAndFlush(BasicClient.PING_MESSAGE).addListener(this.pingFailedHandler);
            }
        }
    }

    public BasicClient(RpcConfig rpcConfig, ByteBufAllocator byteBufAllocator, EventLoopGroup eventLoopGroup, T t, Class<HANDSHAKE_RESPONSE> cls, Parser<HANDSHAKE_RESPONSE> parser) {
        super(rpcConfig);
        this.responseClass = cls;
        this.handshakeType = t;
        this.handshakeParser = parser;
        this.pingHandler = rpcConfig.hasTimeout() ? new IdlePingHandler(rpcConfig.hasTimeout() ? (long) (rpcConfig.getTimeout() * 1000.0d * PERCENT_TIMEOUT_BEFORE_SENDING_PING) : -1L) : null;
        this.b = new Bootstrap().group(eventLoopGroup).channel(TransportCheck.getClientSocketChannel()).option(ChannelOption.ALLOCATOR, byteBufAllocator).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_RCVBUF, 131072).option(ChannelOption.SO_SNDBUF, 131072).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.drill.exec.rpc.BasicClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                BasicClient.this.connection = (R) BasicClient.this.initRemoteConnection(socketChannel);
                socketChannel.closeFuture().addListener(BasicClient.this.getCloseHandler(socketChannel, BasicClient.this.connection));
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast("protocol-decoder", BasicClient.this.getDecoder(BasicClient.this.connection.getAllocator()));
                pipeline.addLast("message-decoder", new RpcDecoder("c-" + BasicClient.this.rpcConfig.getName()));
                pipeline.addLast("protocol-encoder", new RpcEncoder("c-" + BasicClient.this.rpcConfig.getName()));
                pipeline.addLast("handshake-handler", new ClientHandshakeHandler());
                if (BasicClient.this.pingHandler != null) {
                    pipeline.addLast("idle-state-handler", BasicClient.this.pingHandler);
                }
                pipeline.addLast("message-handler", new RpcBus.InboundHandler(BasicClient.this, BasicClient.this.connection));
                pipeline.addLast("exception-handler", new RpcExceptionHandler(BasicClient.this.connection));
            }
        });
    }

    @Override // org.apache.drill.exec.rpc.RpcBus
    public R initRemoteConnection(SocketChannel socketChannel) {
        this.local = socketChannel.localAddress();
        this.remote = socketChannel.remoteAddress();
        return null;
    }

    public abstract ProtobufLengthDecoder getDecoder(BufferAllocator bufferAllocator);

    public boolean isActive() {
        return (this.connection == null || this.connection.getChannel() == null || !this.connection.getChannel().isActive()) ? false : true;
    }

    protected abstract void validateHandshake(HANDSHAKE_RESPONSE handshake_response) throws RpcException;

    protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake_response, R r);

    public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> rpcOutcomeListener, T t, SEND send, Class<RECEIVE> cls, ByteBuf... byteBufArr) {
        super.send(rpcOutcomeListener, this.connection, t, send, cls, byteBufArr);
    }

    public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T t, SEND send, Class<RECEIVE> cls, ByteBuf... byteBufArr) {
        return super.send((BasicClient<T, R, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>) this.connection, (R) t, (T) send, (Class) cls, byteBufArr);
    }

    @Override // org.apache.drill.exec.rpc.RpcBus
    public boolean isClient() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void connectAsClient(RpcConnectionHandler<R> rpcConnectionHandler, HANDSHAKE_SEND handshake_send, String str, int i) {
        this.b.connect(str, i).addListener(new ConnectionMultiListener(rpcConnectionHandler, handshake_send).connectionHandler);
    }

    public void setAutoRead(boolean z) {
        this.connection.setAutoRead(z);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.debug("Closing client");
        try {
            this.connection.getChannel().close().get();
        } catch (InterruptedException | ExecutionException e) {
            logger.warn("Failure while shutting {}", getClass().getName(), e);
            Thread.currentThread().interrupt();
        }
    }
}
