/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.corerpc.netty;

import com.google.protobuf.Message;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
import org.apache.inlong.tubemq.corebase.protobuf.generated.RPCProtos;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
import org.apache.inlong.tubemq.corerpc.RequestWrapper;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
import org.apache.inlong.tubemq.corerpc.RpcDataPack;
import org.apache.inlong.tubemq.corerpc.codec.PbEnDecoder;
import org.apache.inlong.tubemq.corerpc.exception.ServerNotReadyException;
import org.apache.inlong.tubemq.corerpc.netty.ByteBufferInputStream;
import org.apache.inlong.tubemq.corerpc.netty.ByteBufferOutputStream;
import org.apache.inlong.tubemq.corerpc.netty.NettyProtocolDecoder;
import org.apache.inlong.tubemq.corerpc.netty.NettyProtocolEncoder;
import org.apache.inlong.tubemq.corerpc.netty.NettyRequestContext;
import org.apache.inlong.tubemq.corerpc.protocol.Protocol;
import org.apache.inlong.tubemq.corerpc.protocol.ProtocolFactory;
import org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer;
import org.apache.inlong.tubemq.corerpc.utils.MixUtils;
import org.apache.inlong.tubemq.corerpc.utils.TSSLEngineUtil;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyRpcServer
implements ServiceRpcServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
    private static final ConcurrentHashMap<String, AtomicLong> errParseAddrMap = new ConcurrentHashMap();
    private static AtomicLong lastParseTime = new AtomicLong(0L);
    private final ConcurrentHashMap<Integer, Protocol> protocols = new ConcurrentHashMap();
    private ServerBootstrap bootstrap;
    private NioServerSocketChannelFactory channelFactory = null;
    private AtomicBoolean started = new AtomicBoolean(false);
    private int protocolType = 10;
    private boolean isOverTLS;
    private String keyStorePath = "";
    private String keyStorePassword = "";
    private boolean needTwoWayAuthentic = false;
    private String trustStorePath = "";
    private String trustStorePassword = "";

    public NettyRpcServer(RpcConfig conf) throws Exception {
        long nettyRecvBuf;
        long nettySendBuf;
        long nettyWriteLowMark;
        this.isOverTLS = conf.getBoolean("tcp.tls", false);
        if (this.isOverTLS) {
            this.protocolType = 11;
            this.keyStorePath = conf.getString("tls.keystore.path");
            this.keyStorePassword = conf.getString("tls.keystore.password");
            this.needTwoWayAuthentic = conf.getBoolean("tls.twoway.authentic", false);
            if (this.needTwoWayAuthentic) {
                this.trustStorePath = conf.getString("tls.truststore.path");
                this.trustStorePassword = conf.getString("tls.truststore.password");
            }
            if (this.keyStorePath == null || this.keyStorePassword == null) {
                throw new Exception(new StringBuilder(512).append("Required parameters: ").append("tls.keystore.path").append(" or ").append("tls.keystore.password").append(" for TLS!").toString());
            }
            if (this.needTwoWayAuthentic && (this.trustStorePath == null || this.trustStorePassword == null)) {
                throw new Exception(new StringBuilder(512).append("Required parameters: ").append("tls.truststore.path").append(" or ").append("tls.truststore.password").append(" for TLS!").toString());
            }
        }
        int bossCount = conf.getInt("rpc.netty.boss.count", 1);
        int workerCount = conf.getInt("rpc.netty.worker.count", RpcConstants.CFG_DEFAULT_SERVER_WORKER_COUNT);
        this.bootstrap = new ServerBootstrap((ChannelFactory)new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), bossCount, (Executor)Executors.newCachedThreadPool(), workerCount));
        this.bootstrap.setOption("tcpNoDelay", (Object)conf.getBoolean("rpc.tcp.nodelay", true));
        this.bootstrap.setOption("reuseAddress", (Object)conf.getBoolean("rpc.tcp.reuseaddress", true));
        long nettyWriteHighMark = conf.getLong("rpc.netty.write.highmark", -1L);
        if (nettyWriteHighMark > 0L) {
            this.bootstrap.setOption("writeBufferHighWaterMark", (Object)nettyWriteHighMark);
        }
        if ((nettyWriteLowMark = conf.getLong("rpc.netty.write.lowmark", -1L)) > 0L) {
            this.bootstrap.setOption("writeBufferLowWaterMark", (Object)nettyWriteLowMark);
        }
        if ((nettySendBuf = conf.getLong("rpc.netty.send.buffer", -1L)) > 0L) {
            this.bootstrap.setOption("sendBufferSize", (Object)nettySendBuf);
        }
        if ((nettyRecvBuf = conf.getLong("rpc.netty.receive.buffer", -1L)) > 0L) {
            this.bootstrap.setOption("receiveBufferSize", (Object)nettyRecvBuf);
        }
    }

    @Override
    public void start(int listenPort) throws Exception {
        if (this.started.get()) {
            return;
        }
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
                if (NettyRpcServer.this.isOverTLS) {
                    try {
                        SSLEngine sslEngine = TSSLEngineUtil.createSSLEngine(NettyRpcServer.this.keyStorePath, NettyRpcServer.this.trustStorePath, NettyRpcServer.this.keyStorePassword, NettyRpcServer.this.trustStorePassword, false, NettyRpcServer.this.needTwoWayAuthentic);
                        pipeline.addLast("ssl", (ChannelHandler)new SslHandler(sslEngine));
                    }
                    catch (Throwable t) {
                        logger.error("TLS NettyRpcServer init SSLEngine error, system auto exit!", t);
                        System.exit(1);
                    }
                }
                pipeline.addLast("protocolEncoder", (ChannelHandler)new NettyProtocolDecoder());
                pipeline.addLast("protocolDecoder", (ChannelHandler)new NettyProtocolEncoder());
                pipeline.addLast("serverHandler", (ChannelHandler)new NettyServerHandler(NettyRpcServer.this.protocolType));
                return pipeline;
            }
        });
        this.bootstrap.bind((SocketAddress)new InetSocketAddress(listenPort));
        this.started.set(true);
        if (this.isOverTLS) {
            logger.info(new StringBuilder(256).append("TLS RpcServer started, listen port: ").append(listenPort).toString());
        } else {
            logger.info(new StringBuilder(256).append("TCP RpcServer started, listen port: ").append(listenPort).toString());
        }
    }

    @Override
    public void publishService(String serviceName, Object serviceInstance, ExecutorService threadPool) throws Exception {
        Protocol protocol = this.protocols.get(this.protocolType);
        if (protocol == null) {
            if (ProtocolFactory.getProtocol(this.protocolType) == null) {
                throw new Exception(new StringBuilder(256).append("Invalid protocol type ").append(this.protocolType).append("! You have to register you new protocol before publish service.").toString());
            }
            protocol = ProtocolFactory.getProtocolInstance(this.protocolType);
            this.protocols.put(this.protocolType, protocol);
        }
        protocol.registerService(this.isOverTLS, serviceName, serviceInstance, threadPool);
    }

    @Override
    public void removeService(int protocolType, String serviceName) throws Exception {
        Protocol protocol = this.protocols.get(protocolType);
        if (protocol != null) {
            protocol.removeService(serviceName);
        }
    }

    @Override
    public void removeAllService(int protocolType) throws Exception {
        Protocol protocol = this.protocols.get(protocolType);
        if (protocol != null) {
            protocol.removeAllService();
        }
    }

    @Override
    public boolean isServiceStarted() {
        return this.started.get();
    }

    @Override
    public void stop() throws Exception {
        if (!this.started.get()) {
            return;
        }
        if (this.started.compareAndSet(true, false)) {
            logger.info("Stopping RpcServer...");
            this.bootstrap.releaseExternalResources();
            logger.info("RpcServer stop successfully.");
        }
    }

    private class NettyServerHandler
    extends SimpleChannelUpstreamHandler {
        private int protocolType = 10;

        public NettyServerHandler(int protocolType) {
            this.protocolType = protocolType;
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            if (!(e.getCause() instanceof IOException)) {
                logger.error("catch some exception not IOException", e.getCause());
            }
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            RPCProtos.RequestBody rpcRequestBody;
            RPCProtos.RequestHeader requestHeader;
            RPCProtos.RpcConnHeader connHeader;
            if (!(e.getMessage() instanceof RpcDataPack)) {
                return;
            }
            RpcDataPack dataPack = (RpcDataPack)e.getMessage();
            int rmtVersion = 3;
            Channel channel = ctx.getChannel();
            if (channel == null) {
                return;
            }
            String rmtaddrIp = AddressUtils.getRemoteAddressIP(channel);
            try {
                if (!NettyRpcServer.this.isServiceStarted()) {
                    throw new ServerNotReadyException("RpcServer is not running yet");
                }
                List<ByteBuffer> req = dataPack.getDataLst();
                ByteBufferInputStream dis = new ByteBufferInputStream(req);
                connHeader = RPCProtos.RpcConnHeader.parseDelimitedFrom(dis);
                requestHeader = RPCProtos.RequestHeader.parseDelimitedFrom(dis);
                rmtVersion = requestHeader.getProtocolVer();
                rpcRequestBody = RPCProtos.RequestBody.parseDelimitedFrom(dis);
            }
            catch (Throwable e1) {
                List<ByteBuffer> res;
                if (!(e1 instanceof ServerNotReadyException) && rmtaddrIp != null) {
                    AtomicLong count = (AtomicLong)errParseAddrMap.get(rmtaddrIp);
                    if (count == null) {
                        AtomicLong tmpCount = new AtomicLong(0L);
                        count = errParseAddrMap.putIfAbsent(rmtaddrIp, tmpCount);
                        if (count == null) {
                            count = tmpCount;
                        }
                    }
                    count.incrementAndGet();
                    long befTime = lastParseTime.get();
                    long curTime = System.currentTimeMillis();
                    if (curTime - befTime > 180000L && lastParseTime.compareAndSet(befTime, System.currentTimeMillis())) {
                        logger.warn(new StringBuilder(512).append("[Abnormal Visit] Abnormal Message Content visit list is :").append(errParseAddrMap).toString());
                        errParseAddrMap.clear();
                    }
                }
                if ((res = this.prepareResponse(null, rmtVersion, RPCProtos.ResponseHeader.Status.FATAL, e1.getClass().getName(), new StringBuilder(512).append("IPC server unable to read call parameters:").append(e1.getMessage()).toString())) != null) {
                    dataPack.setDataLst(res);
                    channel.write((Object)dataPack);
                }
                return;
            }
            try {
                RequestWrapper requestWrapper = new RequestWrapper(requestHeader.getServiceType(), this.protocolType, requestHeader.getProtocolVer(), connHeader.getFlag(), rpcRequestBody.getTimeout());
                requestWrapper.setMethodId(rpcRequestBody.getMethod());
                requestWrapper.setRequestData(PbEnDecoder.pbDecode(true, rpcRequestBody.getMethod(), rpcRequestBody.getRequest().toByteArray()));
                requestWrapper.setSerialNo(dataPack.getSerialNo());
                NettyRequestContext context = new NettyRequestContext(requestWrapper, ctx, System.currentTimeMillis());
                ((Protocol)NettyRpcServer.this.protocols.get(this.protocolType)).handleRequest(context, rmtaddrIp);
            }
            catch (Throwable ee) {
                List<ByteBuffer> res = this.prepareResponse(null, rmtVersion, RPCProtos.ResponseHeader.Status.FATAL, ee.getClass().getName(), new StringBuilder(512).append("IPC server handle request error :").append(ee.getMessage()).toString());
                if (res != null) {
                    dataPack.setDataLst(res);
                    ctx.getChannel().write((Object)dataPack);
                }
                return;
            }
        }

        protected List<ByteBuffer> prepareResponse(Object value, int rmtVersion, RPCProtos.ResponseHeader.Status status, String errorClass, String error) {
            ByteBufferOutputStream buf = new ByteBufferOutputStream();
            DataOutputStream out = new DataOutputStream(buf);
            errorClass = MixUtils.replaceClassNamePrefix(errorClass, true, rmtVersion);
            try {
                RPCProtos.RpcConnHeader.Builder connBuilder = RPCProtos.RpcConnHeader.newBuilder();
                connBuilder.setFlag(1);
                connBuilder.build().writeDelimitedTo(out);
                RPCProtos.ResponseHeader.Builder builder = RPCProtos.ResponseHeader.newBuilder();
                builder.setStatus(status);
                builder.build().writeDelimitedTo(out);
                if (error != null) {
                    RPCProtos.RspExceptionBody.Builder b = RPCProtos.RspExceptionBody.newBuilder();
                    b.setExceptionName(errorClass);
                    b.setStackTrace(error);
                    b.build().writeDelimitedTo(out);
                } else if (value != null) {
                    ((Message)value).writeDelimitedTo((OutputStream)out);
                }
            }
            catch (IOException e) {
                logger.warn(new StringBuilder(512).append("Exception while creating response ").append(e).toString());
            }
            return buf.getBufferList();
        }
    }
}

