package com.alipay.sofa.registry.server.data.event.handler;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyFetchDatumRequest;
import com.alipay.sofa.registry.common.model.dataserver.NotifyOnlineRequest;
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.BackupTriad;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.correction.LocalDataServerCleanHandler;
import com.alipay.sofa.registry.server.data.event.LocalDataServerChangeEvent;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.DataNodeExchanger;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler.class */
public class LocalDataServerChangeEventHandler extends AbstractEventHandler<LocalDataServerChangeEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalDataServerChangeEventHandler.class);

    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Autowired
    private LocalDataServerCleanHandler localDataServerCleanHandler;

    @Autowired
    private DataServerCache dataServerCache;

    @Autowired
    private DataNodeExchanger dataNodeExchanger;

    @Autowired
    private DataNodeStatus dataNodeStatus;
    private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque();
    private AtomicBoolean isChanged = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/registry/server/data/event/handler/LocalDataServerChangeEventHandler$LocalClusterDataSyncer.class */
    public class LocalClusterDataSyncer implements Runnable {
        private LocalClusterDataSyncer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    LocalDataServerChangeEvent localDataServerChangeEvent = (LocalDataServerChangeEvent) LocalDataServerChangeEventHandler.this.events.take();
                    if (localDataServerChangeEvent.getNewJoined().contains(DataServerConfig.IP) && LocalDataServerChangeEventHandler.this.dataNodeStatus.getStatus() != LocalServerStatusEnum.INITIAL) {
                        LocalDataServerChangeEventHandler.this.dataNodeStatus.setStatus(LocalServerStatusEnum.INITIAL);
                    }
                    if (LocalDataServerChangeEventHandler.this.events.size() <= 0) {
                        long version = localDataServerChangeEvent.getVersion();
                        LocalDataServerChangeEventHandler.LOGGER.info("begin handle dataserver change, version={},localDataServer={}", Long.valueOf(version), localDataServerChangeEvent.getLocalDataServerMap().keySet());
                        LocalDataServerChangeEventHandler.this.isChanged.set(false);
                        if (LocalServerStatusEnum.WORKING == LocalDataServerChangeEventHandler.this.dataNodeStatus.getStatus()) {
                            notifyToFetch(localDataServerChangeEvent, version);
                        } else {
                            LocalDataServerChangeEventHandler.this.dataServerCache.checkAndUpdateStatus(version);
                            notifyOnline(version);
                            LocalDataServerChangeEventHandler.this.dataServerCache.updateItem(localDataServerChangeEvent.getLocalDataServerMap(), Long.valueOf(localDataServerChangeEvent.getLocalDataCenterversion()), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
                        }
                    }
                } catch (Throwable th) {
                    LocalDataServerChangeEventHandler.LOGGER.error("sync local data error", th);
                }
            }
        }

        private void notifyToFetch(LocalDataServerChangeEvent localDataServerChangeEvent, long j) {
            Map<String, DataNode> localDataServerMap = localDataServerChangeEvent.getLocalDataServerMap();
            ConsistentHash<DataNode> consistentHash = new ConsistentHash<>(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getNumberOfReplicas(), Lists.newArrayList(localDataServerMap.values()));
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(localDataServerMap);
            Map<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = getToBeSyncMap(consistentHash);
            if (LocalDataServerChangeEventHandler.this.isChanged.get()) {
                return;
            }
            if (!toBeSyncMap.isEmpty()) {
                for (Map.Entry<String, Map<String, Map<String, BackupTriad>>> entry : toBeSyncMap.entrySet()) {
                    String key = entry.getKey();
                    HashMap hashMap = new HashMap();
                    for (Map.Entry<String, Map<String, BackupTriad>> entry2 : entry.getValue().entrySet()) {
                        String key2 = entry2.getKey();
                        HashMap hashMap2 = new HashMap();
                        Iterator<Map.Entry<String, BackupTriad>> it = entry2.getValue().entrySet().iterator();
                        while (it.hasNext()) {
                            String key3 = it.next().getKey();
                            Datum datum = DatumCache.get(key2, key3);
                            if (datum != null) {
                                hashMap2.put(key3, Long.valueOf(datum.getVersion()));
                            }
                        }
                        if (!hashMap2.isEmpty()) {
                            hashMap.put(key2, hashMap2);
                        }
                    }
                    if (!hashMap.isEmpty()) {
                        concurrentHashMap.remove(key);
                        if (doNotify(key, hashMap, j)) {
                            LocalDataServerChangeEventHandler.this.dataServerCache.removeNotifyNewStatusNode(key);
                        }
                    }
                }
            }
            if (!concurrentHashMap.isEmpty()) {
                for (String str : concurrentHashMap.keySet()) {
                    if (doNotify(str, new HashMap(), j)) {
                        LocalDataServerChangeEventHandler.this.dataServerCache.removeNotifyNewStatusNode(str);
                    }
                }
            }
            if (LocalDataServerChangeEventHandler.this.isChanged.get()) {
                return;
            }
            LocalDataServerChangeEventHandler.this.dataServerCache.updateItem(localDataServerMap, Long.valueOf(localDataServerChangeEvent.getLocalDataCenterversion()), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
        }

        private Map<String, Map<String, Map<String, BackupTriad>>> getToBeSyncMap(ConsistentHash<DataNode> consistentHash) {
            List<DataNode> nUniqueNodesFor;
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (Map.Entry<String, Map<String, Datum>> entry : DatumCache.getAll().entrySet()) {
                String key = entry.getKey();
                for (String str : entry.getValue().keySet()) {
                    if (LocalDataServerChangeEventHandler.this.isChanged.get()) {
                        return new HashMap();
                    }
                    if (hashMap2.containsKey(str)) {
                        nUniqueNodesFor = (List) hashMap2.get(str);
                    } else {
                        nUniqueNodesFor = consistentHash.getNUniqueNodesFor(str, LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getStoreNodes());
                        hashMap2.put(str, nUniqueNodesFor);
                    }
                    BackupTriad calculateOldBackupTriad = LocalDataServerChangeEventHandler.this.dataServerCache.calculateOldBackupTriad(str, LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter(), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig);
                    if (calculateOldBackupTriad != null) {
                        List<DataNode> newJoined = calculateOldBackupTriad.getNewJoined(nUniqueNodesFor, LocalDataServerChangeEventHandler.this.dataServerCache.getNotWorking());
                        LocalDataServerChangeEventHandler.LOGGER.info("DataInfoId {} has got newJoinedNodes={}  for backupNodes={},now backupTriad is {}", new Object[]{str, newJoined, nUniqueNodesFor, calculateOldBackupTriad});
                        if (!newJoined.isEmpty()) {
                            Iterator<DataNode> it = newJoined.iterator();
                            while (it.hasNext()) {
                                String ip = it.next().getIp();
                                if (!hashMap.containsKey(ip)) {
                                    hashMap.put(ip, new HashMap());
                                }
                                Map map = (Map) hashMap.get(ip);
                                if (!map.containsKey(key)) {
                                    map.put(key, new HashMap());
                                }
                                ((Map) map.get(key)).put(str, calculateOldBackupTriad);
                            }
                        }
                    }
                }
            }
            LocalDataServerChangeEventHandler.LOGGER.info("Get to Be SyncMap {}", hashMap);
            return hashMap;
        }

        private boolean doNotify(String str, final Map<String, Map<String, Long>> map, final long j) {
            while (!LocalDataServerChangeEventHandler.this.isChanged.get()) {
                final DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter(), str);
                if (dataServerNode == null || dataServerNode.getConnection() == null) {
                    LocalDataServerChangeEventHandler.LOGGER.info("notify version change to sync has not connect,targetNode={}, map={}", str, map);
                    return false;
                }
                try {
                    CommonResponse commonResponse = (CommonResponse) LocalDataServerChangeEventHandler.this.dataNodeExchanger.request(new Request() { // from class: com.alipay.sofa.registry.server.data.event.handler.LocalDataServerChangeEventHandler.LocalClusterDataSyncer.1
                        public Object getRequestBody() {
                            return new NotifyFetchDatumRequest(map, DataServerConfig.IP, j);
                        }

                        public URL getRequestUrl() {
                            return new URL(dataServerNode.getConnection().getRemoteIP(), dataServerNode.getConnection().getRemotePort());
                        }
                    }).getResult();
                    if (!commonResponse.isSuccess()) {
                        throw new RuntimeException(commonResponse.getMessage());
                    }
                    LocalDataServerChangeEventHandler.LOGGER.info("notify {} version change to sync,current node list version={}, map={}", new Object[]{dataServerNode.getIp(), Long.valueOf(j), map});
                    return true;
                } catch (Throwable th) {
                    LocalDataServerChangeEventHandler.LOGGER.error("notify {} to fetch datum error", str, th);
                    TimeUtil.randomDelay(500);
                }
            }
            return false;
        }

        private void notifyOnline(final long j) {
            loop0: for (Map.Entry<String, DataServerNode> entry : DataServerNodeFactory.getDataServerNodes(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter()).entrySet()) {
                while (true) {
                    String key = entry.getKey();
                    final DataServerNode value = entry.getValue();
                    if (value == null) {
                        break;
                    }
                    try {
                    } catch (Exception e) {
                        LocalDataServerChangeEventHandler.LOGGER.info("notify {} that i am newer failed", key);
                        LocalDataServerChangeEventHandler.LOGGER.error("notify {} that i am newer error", key, e);
                        TimeUtil.randomDelay(500);
                    }
                    if (value.getConnection() != null && value.getConnection().isFine()) {
                        CommonResponse commonResponse = (CommonResponse) LocalDataServerChangeEventHandler.this.dataNodeExchanger.request(new Request() { // from class: com.alipay.sofa.registry.server.data.event.handler.LocalDataServerChangeEventHandler.LocalClusterDataSyncer.2
                            public Object getRequestBody() {
                                return new NotifyOnlineRequest(DataServerConfig.IP, j);
                            }

                            public URL getRequestUrl() {
                                return new URL(value.getConnection().getRemoteIP(), value.getConnection().getRemotePort());
                            }
                        }).getResult();
                        if (commonResponse.isSuccess()) {
                            LocalDataServerChangeEventHandler.LOGGER.info("notify {} that i am newer success,version={}", key, Long.valueOf(j));
                            break;
                        } else {
                            throw new RuntimeException(commonResponse.getMessage());
                            break loop0;
                        }
                    }
                    TimeUtil.randomDelay(1000);
                }
            }
        }
    }

    @Override // com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler
    public Class interest() {
        return LocalDataServerChangeEvent.class;
    }

    @Override // com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler
    public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
        this.isChanged.set(true);
        this.localDataServerCleanHandler.reset();
        this.events.offer(localDataServerChangeEvent);
    }

    @Override // com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();
        start();
    }

    public void start() {
        ExecutorFactory.newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName()).execute(new LocalClusterDataSyncer());
    }
}
