/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.corerpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.Shutdownable;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corerpc.AbstractServiceInvoker;
import org.apache.inlong.tubemq.corerpc.RemoteConErrStats;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcServiceFailoverInvoker;
import org.apache.inlong.tubemq.corerpc.RpcServiceInvoker;
import org.apache.inlong.tubemq.corerpc.client.ClientFactory;
import org.apache.inlong.tubemq.corerpc.exception.LocalConnException;
import org.apache.inlong.tubemq.corerpc.netty.NettyRpcServer;
import org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcServiceFactory {
    private static final Logger logger = LoggerFactory.getLogger(RpcServiceFactory.class);
    private static final int DEFAULT_IDLE_TIME = 600000;
    private static final AtomicInteger threadIdGen = new AtomicInteger(0);
    private final ClientFactory clientFactory;
    private final ConcurrentHashMap<Integer, ServiceRpcServer> servers = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, ServiceHolder> servicesCache = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, RemoteConErrStats> remoteAddrMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Long> forbiddenAddrMap = new ConcurrentHashMap();
    private final ConnectionManager connectionManager;
    private final ConcurrentHashMap<String, ConnectionNode> brokerQueue = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Long> updateTime = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, Long> brokerUnavailableMap = new ConcurrentHashMap();
    private long unAvailableFbdDurationMs = 50000L;
    private final AtomicLong lastLogPrintTime = new AtomicLong(0L);
    private final AtomicLong lastCheckTime = new AtomicLong(0L);
    private long linkStatsDurationMs = 60000L;
    private long linkStatsForbiddenDurMs = 1800000L;
    private int linkStatsMaxAllowedFailCount = 5;
    private double linkStatsMaxAllowedForbiddenRate = 0.3;

    public RpcServiceFactory() {
        this.clientFactory = null;
        this.connectionManager = null;
    }

    public RpcServiceFactory(ClientFactory clientFactory) {
        this.clientFactory = clientFactory;
        this.connectionManager = null;
    }

    public RpcServiceFactory(ClientFactory clientFactory, RpcConfig config) {
        this.clientFactory = clientFactory;
        this.linkStatsDurationMs = config.getLong("rpc.link.quality.stats.duration", 60000L);
        this.linkStatsForbiddenDurMs = config.getLong("rpc.link.quality.forbidden.duration", 1800000L);
        this.linkStatsMaxAllowedFailCount = config.getInt("rpc.link.quality.max.allowed.fail.count", 5);
        this.linkStatsMaxAllowedForbiddenRate = config.getDouble("rpc.link.quality.max.fail.forbidden.rate", 0.3);
        this.unAvailableFbdDurationMs = config.getLong("rpc.unavailable.service.forbidden.duration", 50000L);
        this.connectionManager = new ConnectionManager();
        this.connectionManager.setName(new StringBuilder(256).append("rpcFactory-Thread-").append(threadIdGen.getAndIncrement()).toString());
        this.connectionManager.start();
    }

    public boolean isRemoteAddrForbidden(String remoteAddr) {
        Long forbiddenTime = this.forbiddenAddrMap.get(remoteAddr);
        if (forbiddenTime == null) {
            return false;
        }
        if (System.currentTimeMillis() - forbiddenTime <= this.linkStatsForbiddenDurMs) {
            return true;
        }
        this.forbiddenAddrMap.remove(remoteAddr);
        return false;
    }

    public ConcurrentHashMap<String, Long> getForbiddenAddrMap() {
        return this.forbiddenAddrMap;
    }

    public ConcurrentHashMap<Integer, Long> getUnavailableBrokerMap() {
        return this.brokerUnavailableMap;
    }

    public void resetRmtAddrErrCount(String remoteAddr) {
        RemoteConErrStats newErrStatistic;
        this.forbiddenAddrMap.remove(remoteAddr);
        RemoteConErrStats rmtConErrStats = this.remoteAddrMap.get(remoteAddr);
        if (rmtConErrStats == null && (rmtConErrStats = this.remoteAddrMap.putIfAbsent(remoteAddr, newErrStatistic = new RemoteConErrStats(this.linkStatsDurationMs, this.linkStatsMaxAllowedFailCount))) == null) {
            rmtConErrStats = newErrStatistic;
        }
        rmtConErrStats.resetErrCount();
    }

    public void addRmtAddrErrCount(String remoteAddr) {
        RemoteConErrStats newErrStatistic;
        RemoteConErrStats rmtConErrStats = this.remoteAddrMap.get(remoteAddr);
        if (rmtConErrStats == null && (rmtConErrStats = this.remoteAddrMap.putIfAbsent(remoteAddr, newErrStatistic = new RemoteConErrStats(this.linkStatsDurationMs, this.linkStatsMaxAllowedFailCount))) == null) {
            rmtConErrStats = newErrStatistic;
        }
        if (rmtConErrStats.increErrCount()) {
            boolean isAdded = false;
            Long beforeTime = this.forbiddenAddrMap.get(remoteAddr);
            if (beforeTime == null) {
                int totalCount = 0;
                Long curTime = System.currentTimeMillis();
                HashSet<String> expiredAddrs = new HashSet<String>();
                for (Map.Entry<String, Long> entry : this.forbiddenAddrMap.entrySet()) {
                    if (entry.getKey() == null || entry.getValue() == null) continue;
                    if (curTime - entry.getValue() > this.linkStatsForbiddenDurMs) {
                        expiredAddrs.add(entry.getKey());
                        continue;
                    }
                    ++totalCount;
                }
                if (!expiredAddrs.isEmpty()) {
                    for (String tmpAddr : expiredAddrs) {
                        Long sotreTime = this.forbiddenAddrMap.get(tmpAddr);
                        if (sotreTime == null || curTime - sotreTime <= this.linkStatsForbiddenDurMs) continue;
                        this.forbiddenAddrMap.remove(tmpAddr);
                    }
                }
                int needForbiddenCount = (int)Math.rint((double)this.remoteAddrMap.size() * this.linkStatsMaxAllowedForbiddenRate);
                if ((needForbiddenCount = Math.min(needForbiddenCount, 30)) > totalCount) {
                    this.forbiddenAddrMap.put(remoteAddr, System.currentTimeMillis());
                    isAdded = true;
                }
            } else {
                this.forbiddenAddrMap.put(remoteAddr, System.currentTimeMillis());
                isAdded = true;
            }
            long curLastPrintTime = this.lastLogPrintTime.get();
            if (isAdded && System.currentTimeMillis() - curLastPrintTime > 120000L && this.lastLogPrintTime.compareAndSet(curLastPrintTime, System.currentTimeMillis())) {
                logger.info(new StringBuilder(512).append("[Remote Address] forbidden list : ").append(this.forbiddenAddrMap.toString()).toString());
            }
        }
    }

    public void rmvAllExpiredRecords() {
        long curTime = System.currentTimeMillis();
        HashSet<String> expiredAddrs = new HashSet<String>();
        for (Map.Entry<String, RemoteConErrStats> entry : this.remoteAddrMap.entrySet()) {
            if (entry.getKey() == null || entry.getValue() == null || !entry.getValue().isExpiredRecord(curTime)) continue;
            expiredAddrs.add(entry.getKey());
        }
        if (!expiredAddrs.isEmpty()) {
            for (String string : expiredAddrs) {
                RemoteConErrStats rmtConErrStats = this.remoteAddrMap.get(string);
                if (rmtConErrStats == null || !rmtConErrStats.isExpiredRecord(curTime)) continue;
                this.remoteAddrMap.remove(string);
            }
        }
        expiredAddrs.clear();
        curTime = System.currentTimeMillis();
        for (Map.Entry<String, Object> entry : this.forbiddenAddrMap.entrySet()) {
            if (entry.getKey() == null || entry.getValue() == null || curTime - (Long)entry.getValue() <= this.linkStatsForbiddenDurMs + 60000L) continue;
            expiredAddrs.add(entry.getKey());
        }
        if (!expiredAddrs.isEmpty()) {
            for (String string : expiredAddrs) {
                Long recordTime = this.forbiddenAddrMap.get(string);
                if (recordTime == null || curTime - recordTime <= this.linkStatsForbiddenDurMs + 60000L) continue;
                this.forbiddenAddrMap.remove(string);
            }
        }
    }

    public void addUnavailableBroker(int brokerId) {
        this.brokerUnavailableMap.put(brokerId, System.currentTimeMillis());
    }

    public void rmvExpiredUnavailableBrokers() {
        long curTime = System.currentTimeMillis();
        HashSet<Integer> expiredBrokers = new HashSet<Integer>();
        for (Map.Entry<Integer, Long> entry : this.brokerUnavailableMap.entrySet()) {
            if (entry.getKey() == null || entry.getValue() == null || curTime - entry.getValue() <= this.unAvailableFbdDurationMs) continue;
            expiredBrokers.add(entry.getKey());
        }
        if (!expiredBrokers.isEmpty()) {
            for (Integer brokerId : expiredBrokers) {
                Long lastAddTime = this.brokerUnavailableMap.get(brokerId);
                if (lastAddTime == null || curTime - lastAddTime <= this.unAvailableFbdDurationMs) continue;
                this.brokerUnavailableMap.remove(brokerId, lastAddTime);
            }
        }
    }

    public synchronized <T> T getService(Class<T> clazz, BrokerInfo brokerInfo, RpcConfig config) {
        String serviceKey = this.getServiceKey(brokerInfo.getBrokerAddr(), clazz.getName());
        ServiceHolder h = this.servicesCache.get(serviceKey);
        if (h != null) {
            this.updateTime.put(serviceKey, System.currentTimeMillis());
            return h.getService();
        }
        RpcServiceInvoker invoker = new RpcServiceInvoker(this.clientFactory, clazz, config, new NodeAddrInfo(brokerInfo.getHost(), brokerInfo.getPort(), brokerInfo.getBrokerAddr()));
        Object service = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (InvocationHandler)invoker);
        this.servicesCache.put(serviceKey, new ServiceHolder<Object>(service, invoker));
        this.updateTime.put(serviceKey, System.currentTimeMillis());
        return (T)service;
    }

    public boolean isServiceEmpty() {
        return this.servicesCache.isEmpty();
    }

    public <T> T getOrCreateService(Class<T> clazz, BrokerInfo brokerInfo, RpcConfig config) {
        ConnectionNode curNode;
        String serviceKey = this.getServiceKey(brokerInfo.getBrokerAddr(), clazz.getName());
        ServiceHolder h = this.servicesCache.get(serviceKey);
        if (h != null) {
            return h.getService();
        }
        if (!this.isRemoteAddrForbidden(brokerInfo.getBrokerAddr()) && (curNode = this.brokerQueue.get(serviceKey)) == null) {
            this.brokerQueue.putIfAbsent(serviceKey, new ConnectionNode(clazz, new NodeAddrInfo(brokerInfo.getHost(), brokerInfo.getPort(), brokerInfo.getBrokerAddr()), config));
        }
        return null;
    }

    public synchronized <T> T getFailoverService(Class<T> clazz, MasterInfo masterInfo, RpcConfig config) {
        String serviceKey = this.getFailoverServiceKey(masterInfo, clazz.getName());
        ServiceHolder h = this.servicesCache.get(serviceKey);
        if (h != null) {
            return h.getService();
        }
        RpcServiceFailoverInvoker invoker = new RpcServiceFailoverInvoker(this.clientFactory, clazz, config, masterInfo);
        Object service = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (InvocationHandler)invoker);
        this.servicesCache.put(serviceKey, new ServiceHolder<Object>(service, invoker));
        return (T)service;
    }

    public synchronized void destroyAllService() {
        for (String serviceKey : this.servicesCache.keySet()) {
            ServiceHolder h;
            if (serviceKey == null || (h = this.servicesCache.remove(serviceKey)) == null) continue;
            h.shutdown();
        }
    }

    public synchronized void publishService(Class clazz, Object serviceInstance, int listenPort, RpcConfig config) throws Exception {
        this.publishService(clazz, serviceInstance, listenPort, null, config);
    }

    public synchronized void publishService(Class clazz, Object serviceInstance, int listenPort, ExecutorService threadPool, RpcConfig config) throws Exception {
        ServiceRpcServer server = this.servers.get(listenPort);
        if (server == null) {
            server = new NettyRpcServer(config);
            server.start(listenPort);
            this.servers.put(listenPort, server);
        }
        try {
            server.publishService(clazz.getName(), serviceInstance, threadPool);
        }
        catch (Exception e) {
            logger.error("Publish service failed!", (Throwable)e);
            throw e;
        }
    }

    public synchronized void destroyAllPublishedService() throws Exception {
        for (Integer serverId : this.servers.keySet()) {
            ServiceRpcServer serviceServer;
            if (serverId == null || (serviceServer = this.servers.remove(serverId)) == null) continue;
            serviceServer.removeAllService(10);
            serviceServer.removeAllService(11);
            serviceServer.stop();
        }
    }

    public void destroy() throws Exception {
        if (this.connectionManager != null) {
            this.connectionManager.shutdown();
        }
        this.destroyAllService();
        this.destroyAllPublishedService();
    }

    private String getServiceKey(String targetAddress, String serviceName) {
        return new StringBuilder(256).append(serviceName).append("@").append(targetAddress).toString();
    }

    private String getFailoverServiceKey(MasterInfo masterInfo, String serviceName) {
        return new StringBuilder(256).append(serviceName).append("@").append(masterInfo.getMasterClusterStr()).toString();
    }

    private class ConnectionManager
    extends Thread {
        boolean isRunning = true;

        private ConnectionManager() {
        }

        public void shutdown() {
            logger.info("[SHUTDOWN_TUBE] Shutting down connectionManager.");
            this.isRunning = false;
        }

        @Override
        public void run() {
            while (this.isRunning) {
                try {
                    while (!RpcServiceFactory.this.brokerQueue.isEmpty()) {
                        for (String serviceKey : RpcServiceFactory.this.brokerQueue.keySet()) {
                            ConnectionNode node = (ConnectionNode)RpcServiceFactory.this.brokerQueue.get(serviceKey);
                            ServiceHolder h = (ServiceHolder)RpcServiceFactory.this.servicesCache.get(serviceKey);
                            if (h != null) {
                                RpcServiceFactory.this.brokerQueue.remove(serviceKey);
                                continue;
                            }
                            RpcServiceInvoker invoker = new RpcServiceInvoker(RpcServiceFactory.this.clientFactory, node.clazzType, node.getConfig(), node.getAddressInfo());
                            Object service = Proxy.newProxyInstance(node.clazzType.getClassLoader(), new Class[]{node.clazzType}, (InvocationHandler)invoker);
                            try {
                                invoker.getClientOnce();
                                RpcServiceFactory.this.resetRmtAddrErrCount(node.getHostAndPortStr());
                            }
                            catch (Throwable e) {
                                if (e instanceof LocalConnException) {
                                    RpcServiceFactory.this.addRmtAddrErrCount(node.getHostAndPortStr());
                                }
                                RpcServiceFactory.this.brokerQueue.remove(serviceKey);
                                continue;
                            }
                            RpcServiceFactory.this.servicesCache.putIfAbsent(serviceKey, new ServiceHolder<Object>(service, invoker));
                            RpcServiceFactory.this.updateTime.put(serviceKey, System.currentTimeMillis());
                            RpcServiceFactory.this.brokerQueue.remove(serviceKey);
                        }
                    }
                    long cur = System.currentTimeMillis();
                    if (cur - RpcServiceFactory.this.lastCheckTime.get() >= 30000L) {
                        ArrayList<String> tmpKeyList = new ArrayList<String>();
                        for (Map.Entry entry : RpcServiceFactory.this.updateTime.entrySet()) {
                            if (entry.getKey() == null || entry.getValue() == null || cur - (Long)entry.getValue() <= 600000L) continue;
                            tmpKeyList.add((String)entry.getKey());
                        }
                        for (String serviceKey : tmpKeyList) {
                            RpcServiceFactory.this.servicesCache.remove(serviceKey);
                            RpcServiceFactory.this.updateTime.remove(serviceKey);
                        }
                        RpcServiceFactory.this.rmvAllExpiredRecords();
                        RpcServiceFactory.this.rmvExpiredUnavailableBrokers();
                        RpcServiceFactory.this.lastCheckTime.set(System.currentTimeMillis());
                    }
                }
                catch (Throwable e2) {
                    logger.warn("[connectionManager]: runner found throw error info ", e2);
                }
                ThreadUtils.sleep(80L);
            }
        }
    }

    private static class ServiceHolder<T>
    implements Shutdownable {
        private T service;
        private AbstractServiceInvoker invoker;

        ServiceHolder(T service, AbstractServiceInvoker invoker) {
            this.service = service;
            this.invoker = invoker;
        }

        public T getService() {
            return this.service;
        }

        @Override
        public void shutdown() {
            this.invoker.destroy();
        }
    }

    private static class ConnectionNode {
        private Class clazzType;
        private NodeAddrInfo addressInfo;
        private RpcConfig config;

        public ConnectionNode(Class clazzType, NodeAddrInfo nodeAddrInfo, RpcConfig config) {
            this.clazzType = clazzType;
            this.addressInfo = nodeAddrInfo;
            this.config = config;
        }

        public Class getClazzType() {
            return this.clazzType;
        }

        public NodeAddrInfo getAddressInfo() {
            return this.addressInfo;
        }

        public String getHostAndPortStr() {
            return this.addressInfo.getHostPortStr();
        }

        public RpcConfig getConfig() {
            return this.config;
        }
    }
}

