/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.corerpc.netty;

import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.RPCProtos;
import org.apache.inlong.tubemq.corerpc.RequestWrapper;
import org.apache.inlong.tubemq.corerpc.ResponseWrapper;
import org.apache.inlong.tubemq.corerpc.RpcDataPack;
import org.apache.inlong.tubemq.corerpc.client.CallFuture;
import org.apache.inlong.tubemq.corerpc.client.Callback;
import org.apache.inlong.tubemq.corerpc.client.Client;
import org.apache.inlong.tubemq.corerpc.client.ClientFactory;
import org.apache.inlong.tubemq.corerpc.codec.PbEnDecoder;
import org.apache.inlong.tubemq.corerpc.exception.ClientClosedException;
import org.apache.inlong.tubemq.corerpc.exception.NetworkException;
import org.apache.inlong.tubemq.corerpc.netty.ByteBufferInputStream;
import org.apache.inlong.tubemq.corerpc.netty.ByteBufferOutputStream;
import org.apache.inlong.tubemq.corerpc.utils.MixUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClient
implements Client {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
    private static final AtomicLong init = new AtomicLong(0L);
    private static Timer timer;
    private final ConcurrentHashMap<Integer, Callback<ResponseWrapper>> requests = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, Timeout> timeouts = new ConcurrentHashMap();
    private final AtomicInteger serialNoGenerator = new AtomicInteger(0);
    private final AtomicBoolean released = new AtomicBoolean(false);
    private NodeAddrInfo addressInfo;
    private final ClientFactory clientFactory;
    private Channel channel;
    private final long connectTimeout;
    private final AtomicBoolean closed = new AtomicBoolean(true);

    public NettyClient(ClientFactory clientFactory, long connectTimeout) {
        this.clientFactory = clientFactory;
        this.connectTimeout = connectTimeout;
        if (init.incrementAndGet() == 1L) {
            timer = new HashedWheelTimer();
        }
    }

    public Channel getChannel() {
        return this.channel;
    }

    public void setChannel(Channel channel, NodeAddrInfo addressInfo) {
        this.channel = channel;
        this.addressInfo = addressInfo;
        this.closed.set(false);
    }

    @Override
    public ResponseWrapper call(RequestWrapper request, Callback callback, long timeout, TimeUnit timeUnit) throws Exception {
        block12: {
            if (this.closed.get()) {
                throw new ClientClosedException("Netty client has bean closed!");
            }
            request.setSerialNo(this.serialNoGenerator.incrementAndGet());
            RPCProtos.RpcConnHeader.Builder builder = RPCProtos.RpcConnHeader.newBuilder();
            builder.setFlag(request.getFlagId());
            RPCProtos.RpcConnHeader connectionHeader = builder.build();
            RPCProtos.RequestHeader.Builder headerBuilder = RPCProtos.RequestHeader.newBuilder();
            headerBuilder.setServiceType(request.getServiceType());
            headerBuilder.setProtocolVer(request.getProtocolVersion());
            RPCProtos.RequestHeader rpcHeader = headerBuilder.build();
            RPCProtos.RequestBody.Builder rpcBodyBuilder = RPCProtos.RequestBody.newBuilder();
            rpcBodyBuilder.setMethod(request.getMethodId());
            rpcBodyBuilder.setTimeout(request.getTimeout());
            rpcBodyBuilder.setRequest(ByteString.copyFrom((byte[])PbEnDecoder.pbEncode(request.getRequestData())));
            RPCProtos.RequestBody rpcBodyRequest = rpcBodyBuilder.build();
            ByteBufferOutputStream bbo = new ByteBufferOutputStream();
            connectionHeader.writeDelimitedTo(bbo);
            rpcHeader.writeDelimitedTo(bbo);
            rpcBodyRequest.writeDelimitedTo(bbo);
            RpcDataPack pack = new RpcDataPack(request.getSerialNo(), bbo.getBufferList());
            CallFuture future = new CallFuture(callback);
            this.requests.put(request.getSerialNo(), future);
            if (callback == null) {
                try {
                    this.getChannel().writeAndFlush((Object)pack);
                    return (ResponseWrapper)future.get(timeout, timeUnit);
                }
                catch (Throwable e) {
                    Callback<ResponseWrapper> callback1 = this.requests.remove(request.getSerialNo());
                    if (callback1 != null) {
                        if (this.closed.get()) {
                            throw new ClientClosedException("Netty client has bean closed!");
                        }
                        if (this.getChannel() == null) {
                            throw new ClientClosedException("Send failure for channel is null!");
                        }
                        throw e;
                    }
                    break block12;
                }
            }
            boolean inserted = false;
            try {
                this.timeouts.put(request.getSerialNo(), timer.newTimeout((TimerTask)new TimeoutTask(request.getSerialNo()), timeout, timeUnit));
                inserted = true;
                this.getChannel().writeAndFlush((Object)pack);
            }
            catch (Throwable e) {
                Timeout timeout1;
                Callback<ResponseWrapper> callback1 = this.requests.remove(request.getSerialNo());
                if (callback1 == null) break block12;
                if (inserted && (timeout1 = this.timeouts.remove(request.getSerialNo())) != null) {
                    timeout1.cancel();
                }
                if (this.closed.get()) {
                    throw new ClientClosedException("Netty client has bean closed!");
                }
                if (this.getChannel() == null) {
                    throw new ClientClosedException("Channel is null!");
                }
                throw e;
            }
        }
        return null;
    }

    @Override
    public NodeAddrInfo getServerAddressInfo() {
        return this.addressInfo;
    }

    @Override
    public long getConnectTimeout() {
        return this.connectTimeout;
    }

    @Override
    public boolean isReady() {
        return !this.closed.get() && this.channel != null && this.channel.isOpen() && this.channel.isWritable() && this.channel.isActive();
    }

    @Override
    public boolean isWritable() {
        return !this.closed.get() && this.channel != null && this.channel.isWritable();
    }

    @Override
    public void close() {
        this.close(true);
    }

    @Override
    public void close(boolean removeParent) {
        if (this.released.compareAndSet(false, true) && init.decrementAndGet() == 0L) {
            timer.stop();
        }
        if (this.closed.compareAndSet(false, true)) {
            String clientStr = this.channel != null ? this.channel.toString() : this.addressInfo.getHostPortStr();
            if (removeParent) {
                this.clientFactory.removeClient(this.getServerAddressInfo());
            }
            if (!this.requests.isEmpty()) {
                ClientClosedException exception = new ClientClosedException("Client has bean closed.");
                for (Integer serial : this.requests.keySet()) {
                    Callback<ResponseWrapper> callback;
                    if (serial == null || (callback = this.requests.remove(serial)) == null) continue;
                    callback.handleError(exception);
                }
            }
            if (this.channel != null) {
                this.channel.close();
                this.channel = null;
            }
            logger.info(new StringBuilder(256).append("Client(").append(clientStr).append(") closed").toString());
        }
    }

    @Override
    public ClientFactory getClientFactory() {
        return this.clientFactory;
    }

    public class TimeoutTask
    implements TimerTask {
        private final int serialNo;

        public TimeoutTask(int serialNo) {
            this.serialNo = serialNo;
        }

        public void run(Timeout timeout) throws Exception {
            Callback callback;
            Timeout timeout1 = (Timeout)NettyClient.this.timeouts.remove(this.serialNo);
            if (timeout1 != null) {
                timeout1.cancel();
            }
            if ((callback = (Callback)NettyClient.this.requests.remove(this.serialNo)) != null) {
                NettyClient.this.channel.eventLoop().execute(() -> callback.handleError(new TimeoutException("Request is timeout!")));
            }
        }
    }

    public class NettyClientHandler
    extends ChannelInboundHandlerAdapter {
        public void channelRead(ChannelHandlerContext ctx, Object e) {
            logger.debug("client message receive!");
            if (e instanceof RpcDataPack) {
                logger.debug("RpcDataPack client message receive!");
                RpcDataPack dataPack = (RpcDataPack)e;
                Callback callback = (Callback)NettyClient.this.requests.remove(dataPack.getSerialNo());
                if (callback != null) {
                    Timeout timeout = (Timeout)NettyClient.this.timeouts.remove(dataPack.getSerialNo());
                    if (timeout != null) {
                        timeout.cancel();
                    }
                    try {
                        Throwable remote;
                        ResponseWrapper responseWrapper;
                        ByteBufferInputStream in = new ByteBufferInputStream(dataPack.getDataLst());
                        RPCProtos.RpcConnHeader connHeader = RPCProtos.RpcConnHeader.parseDelimitedFrom(in);
                        if (connHeader == null) {
                            throw new EOFException();
                        }
                        RPCProtos.ResponseHeader rpcResponse = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
                        if (rpcResponse == null) {
                            throw new EOFException();
                        }
                        RPCProtos.ResponseHeader.Status status = rpcResponse.getStatus();
                        if (status == RPCProtos.ResponseHeader.Status.SUCCESS) {
                            RPCProtos.RspResponseBody pbRpcResponse = RPCProtos.RspResponseBody.parseDelimitedFrom(in);
                            if (pbRpcResponse == null) {
                                throw new NetworkException("Not found PBRpcResponse data!");
                            }
                            Object responseResult = PbEnDecoder.pbDecode(false, pbRpcResponse.getMethod(), pbRpcResponse.getData().toByteArray());
                            responseWrapper = new ResponseWrapper(connHeader.getFlag(), dataPack.getSerialNo(), rpcResponse.getServiceType(), rpcResponse.getProtocolVer(), pbRpcResponse.getMethod(), responseResult);
                        } else {
                            RPCProtos.RspExceptionBody exceptionResponse = RPCProtos.RspExceptionBody.parseDelimitedFrom(in);
                            if (exceptionResponse == null) {
                                throw new NetworkException("Not found RpcException data!");
                            }
                            String exceptionName = exceptionResponse.getExceptionName();
                            exceptionName = MixUtils.replaceClassNamePrefix(exceptionName, false, rpcResponse.getProtocolVer());
                            responseWrapper = new ResponseWrapper(connHeader.getFlag(), dataPack.getSerialNo(), rpcResponse.getServiceType(), rpcResponse.getProtocolVer(), exceptionName, exceptionResponse.getStackTrace());
                        }
                        if (!responseWrapper.isSuccess() && IOException.class.isAssignableFrom((remote = MixUtils.unwrapException(new StringBuilder(512).append(responseWrapper.getErrMsg()).append("#").append(responseWrapper.getStackTrace()).toString())).getClass())) {
                            NettyClient.this.close();
                        }
                        callback.handleResult(responseWrapper);
                    }
                    catch (Throwable ee) {
                        ResponseWrapper responseWrapper = new ResponseWrapper(-2, dataPack.getSerialNo(), -2, -2, -2, ee);
                        if (ee instanceof EOFException) {
                            NettyClient.this.close();
                        }
                        callback.handleResult(responseWrapper);
                    }
                } else if (logger.isDebugEnabled()) {
                    logger.debug("Missing previous call info, maybe it has been timeout.");
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
            Throwable t = e.getCause();
            if (t instanceof IOException || t instanceof ReadTimeoutException || t instanceof UnresolvedAddressException) {
                if (t instanceof ReadTimeoutException) {
                    logger.info("Close client {} due to idle.", (Object)ctx.channel());
                }
                if (t instanceof UnresolvedAddressException) {
                    logger.info("UnresolvedAddressException for connect {} closed.", (Object)NettyClient.this.addressInfo.getHostPortStr());
                }
                NettyClient.this.close();
            } else {
                logger.error("catch some exception not IOException", e.getCause());
            }
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            NettyClient.this.close();
        }
    }
}

