/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.fescar.core.rpc.netty;

import com.alibaba.fescar.common.exception.FrameworkErrorCode;
import com.alibaba.fescar.common.exception.FrameworkException;
import com.alibaba.fescar.common.thread.NamedThreadFactory;
import com.alibaba.fescar.common.util.NetUtil;
import com.alibaba.fescar.core.protocol.AbstractMessage;
import com.alibaba.fescar.core.protocol.HeartbeatMessage;
import com.alibaba.fescar.core.protocol.MergeResultMessage;
import com.alibaba.fescar.core.protocol.MergedWarpMessage;
import com.alibaba.fescar.core.protocol.MessageFuture;
import com.alibaba.fescar.core.protocol.RpcMessage;
import com.alibaba.fescar.core.rpc.ClientMessageListener;
import com.alibaba.fescar.core.rpc.ClientMessageSender;
import com.alibaba.fescar.core.rpc.RemotingService;
import com.alibaba.fescar.core.rpc.netty.AbstractRpcRemoting;
import com.alibaba.fescar.core.rpc.netty.DefaultChannelPoolHandler;
import com.alibaba.fescar.core.rpc.netty.MessageCodecHandler;
import com.alibaba.fescar.core.rpc.netty.NettyClientConfig;
import com.alibaba.fescar.core.rpc.netty.NettyPoolKey;
import com.alibaba.fescar.core.rpc.netty.NettyPoolableFactory;
import com.alibaba.fescar.core.rpc.netty.RegisterMsgListener;
import com.alibaba.fescar.core.rpc.netty.RpcClientHandler;
import com.alibaba.fescar.discovery.registry.RegistryFactory;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRpcRemotingClient
extends AbstractRpcRemoting
implements RemotingService,
RegisterMsgListener,
ClientMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemotingClient.class);
    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private EventExecutorGroup defaultEventExecutorGroup;
    private AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool> clientChannelPool;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    protected GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
    protected ClientMessageListener clientMessageListener;

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null, null);
    }

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {
        super(messageExecutor);
        if (null == nettyClientConfig) {
            nettyClientConfig = new NettyClientConfig();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("use default netty client config.");
            }
        }
        this.nettyClientConfig = nettyClientConfig;
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, (ThreadFactory)new NamedThreadFactory(this.getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
    }

    @Override
    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        this.nettyClientKeyPool = new GenericKeyedObjectPool((KeyedPoolableObjectFactory)keyPoolableFactory);
        this.nettyClientKeyPool.setConfig(this.getNettyPoolConfig());
        super.init();
    }

    @Override
    public void start() {
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(this.nettyClientConfig.getClientWorkerThreads(), (ThreadFactory)new NamedThreadFactory(this.getThreadPrefix(this.nettyClientConfig.getClientWorkerThreadPrefix()), this.nettyClientConfig.getClientWorkerThreads()));
        }
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.eventLoopGroupWorker)).channel(this.nettyClientConfig.getClientChannelClazz())).option(ChannelOption.TCP_NODELAY, (Object)true)).option(ChannelOption.SO_KEEPALIVE, (Object)true)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.nettyClientConfig.getConnectTimeoutMillis())).option(ChannelOption.SO_SNDBUF, (Object)this.nettyClientConfig.getClientSocketSndBufSize())).option(ChannelOption.SO_RCVBUF, (Object)this.nettyClientConfig.getClientSocketRcvBufSize());
        if (this.nettyClientConfig.enableNative()) {
            if (PlatformDependent.isOsx()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("client run on macOS");
                }
            } else {
                ((Bootstrap)this.bootstrap.option(EpollChannelOption.EPOLL_MODE, (Object)EpollMode.EDGE_TRIGGERED)).option(EpollChannelOption.TCP_QUICKACK, (Object)true);
            }
        }
        if (this.nettyClientConfig.isUseConnPool()) {
            this.clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>(){

                protected FixedChannelPool newPool(InetSocketAddress key) {
                    FixedChannelPool fixedClientChannelPool = new FixedChannelPool(AbstractRpcRemotingClient.this.bootstrap.remoteAddress((SocketAddress)key), (ChannelPoolHandler)new DefaultChannelPoolHandler(){

                        @Override
                        public void channelCreated(Channel ch) throws Exception {
                            super.channelCreated(ch);
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(AbstractRpcRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new IdleStateHandler(AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxReadIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxWriteIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxAllIdleSeconds())});
                            pipeline.addLast(AbstractRpcRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new RpcClientHandler()});
                        }
                    }, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, AbstractRpcRemotingClient.this.nettyClientConfig.getMaxAcquireConnMills(), AbstractRpcRemotingClient.this.nettyClientConfig.getPerHostMaxConn(), AbstractRpcRemotingClient.this.nettyClientConfig.getPendingConnSize(), false);
                    return fixedClientChannelPool;
                }
            };
        } else {
            this.bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new ChannelHandler[]{new IdleStateHandler(AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxReadIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxWriteIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxAllIdleSeconds())}).addLast(new ChannelHandler[]{new MessageCodecHandler()});
                    if (null != AbstractRpcRemotingClient.this.channelHandlers) {
                        AbstractRpcRemotingClient.this.addChannelPipelineLast((Channel)ch, AbstractRpcRemotingClient.this.channelHandlers);
                    }
                }
            });
        }
        if (this.initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("AbstractRpcRemotingClient has started");
        }
    }

    protected Channel getNewChannel(InetSocketAddress address) {
        Channel channel = null;
        ChannelFuture f = this.bootstrap.connect((SocketAddress)address);
        try {
            f.await((long)this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
            if (f.isCancelled()) {
                throw new FrameworkException(f.cause(), "connect cancelled, can not connect to fescar-server.");
            }
            if (!f.isSuccess()) {
                throw new FrameworkException(f.cause(), "connect failed, can not connect to fescar-server.");
            }
            channel = f.channel();
        }
        catch (Exception e) {
            throw new FrameworkException((Throwable)e, "can not connect to fescar-server.");
        }
        return channel;
    }

    @Override
    public void shutdown() {
        try {
            if (null != this.clientChannelPool) {
                this.clientChannelPool.close();
            }
            this.eventLoopGroupWorker.shutdownGracefully();
            if (this.defaultEventExecutorGroup != null) {
                this.defaultEventExecutorGroup.shutdownGracefully();
            }
            super.destroy();
        }
        catch (Exception exx) {
            LOGGER.error("Failed to shutdown: {}", (Object)exx.getMessage());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcMessage rpcMessage;
        if (msg instanceof RpcMessage && (rpcMessage = (RpcMessage)msg).getBody() == HeartbeatMessage.PONG) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("received PONG from {}", (Object)ctx.channel().remoteAddress());
            }
            return;
        }
        if (((RpcMessage)msg).getBody() instanceof MergeResultMessage) {
            MergeResultMessage results = (MergeResultMessage)((RpcMessage)msg).getBody();
            MergedWarpMessage mergeMessage = (MergedWarpMessage)this.mergeMsgMap.remove(((RpcMessage)msg).getId());
            int num = mergeMessage.msgs.size();
            for (int i = 0; i < num; ++i) {
                long msgId = mergeMessage.msgIds.get(i);
                MessageFuture future = (MessageFuture)this.futures.remove(msgId);
                if (future == null) {
                    if (!LOGGER.isInfoEnabled()) continue;
                    LOGGER.info("msg: {} is not found in futures.", (Object)msgId);
                    continue;
                }
                future.setResultMessage(results.getMsgs()[i]);
            }
            return;
        }
        super.channelRead(ctx, msg);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("channel inactive: {}", (Object)ctx.channel());
        }
        this.releaseChannel(ctx.channel(), NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress()));
        super.channelInactive(ctx);
    }

    public ClientMessageListener getClientMessageListener() {
        return this.clientMessageListener;
    }

    public void setClientMessageListener(ClientMessageListener clientMessageListener) {
        this.clientMessageListener = clientMessageListener;
    }

    @Override
    public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
        if (this.clientMessageListener != null) {
            String remoteAddress = NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress());
            this.clientMessageListener.onMessage(msgId, remoteAddress, msg, this);
        }
    }

    protected void reconnect(String transactionServiceGroup) {
        List<String> availList = null;
        try {
            availList = this.getAvailServerList(transactionServiceGroup);
        }
        catch (Exception exx) {
            LOGGER.error("Failed to get available servers: {}" + exx.getMessage());
        }
        if (CollectionUtils.isEmpty(availList)) {
            LOGGER.error("no available server to connect.");
            return;
        }
        for (String serverAddress : availList) {
            try {
                this.connect(serverAddress);
            }
            catch (Exception e) {
                LOGGER.error(FrameworkErrorCode.NetConnect.errCode, (Object)("can not connect to " + serverAddress + " cause:" + e.getMessage()), (Object)e);
            }
        }
    }

    protected List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
        ArrayList<String> availList = new ArrayList<String>();
        List availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
        if (!CollectionUtils.isEmpty((Collection)availInetSocketAddressList)) {
            for (InetSocketAddress address : availInetSocketAddressList) {
                availList.add(NetUtil.toStringAddress((InetSocketAddress)address));
            }
        }
        return availList;
    }

    protected String getThreadPrefix(String threadPrefix) {
        return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + this.getTransactionRole().name();
    }

    protected abstract Channel connect(String var1);

    protected abstract void releaseChannel(Channel var1, String var2);

    protected abstract GenericKeyedObjectPool.Config getNettyPoolConfig();

    protected abstract NettyPoolKey.TransactionRole getTransactionRole();

    public class MergedSendRunnable
    implements Runnable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                Iterator iterator = AbstractRpcRemotingClient.this.mergeLock;
                synchronized (iterator) {
                    try {
                        AbstractRpcRemotingClient.this.mergeLock.wait(1L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                AbstractRpcRemotingClient.this.isSending = true;
                for (String address : AbstractRpcRemotingClient.this.basketMap.keySet()) {
                    BlockingQueue basket = (BlockingQueue)AbstractRpcRemotingClient.this.basketMap.get(address);
                    if (basket.isEmpty()) continue;
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = (RpcMessage)basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        this.printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = AbstractRpcRemotingClient.this.connect(address);
                    try {
                        AbstractRpcRemotingClient.this.sendRequest(sendChannel, mergeMessage);
                    }
                    catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && address != null) {
                            AbstractRpcRemotingClient.this.destroyChannel(address, sendChannel);
                        }
                        LOGGER.error("", (Object)"client merge call failed", (Object)e);
                    }
                }
                AbstractRpcRemotingClient.this.isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("merge msg size:" + mergeMessage.msgIds.size());
                for (AbstractMessage cm : mergeMessage.msgs) {
                    LOGGER.debug(cm.toString());
                }
                StringBuffer sb = new StringBuffer();
                for (long l : mergeMessage.msgIds) {
                    sb.append(AbstractRpcRemotingClient.MSG_ID_PREFIX).append(l).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                sb.append("\n");
                for (long l : AbstractRpcRemotingClient.this.futures.keySet()) {
                    sb.append(AbstractRpcRemotingClient.FUTURES_PREFIX).append(l).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                LOGGER.debug(sb.toString());
            }
        }
    }
}

