package com.alipay.sofa.registry.server.data.datasync.sync;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyDataSyncRequest;
import com.alipay.sofa.registry.common.model.dataserver.SyncData;
import com.alipay.sofa.registry.common.model.dataserver.SyncDataRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.datasync.AcceptorStore;
import com.alipay.sofa.registry.server.data.datasync.Operator;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
import com.alipay.sofa.registry.server.data.util.DelayItem;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/datasync/sync/AbstractAcceptorStore.class */
public abstract class AbstractAcceptorStore implements AcceptorStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptorStore.class, "[SyncDataService]");
    private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
    private static final int DEFAULT_DELAY_TIMEOUT = 3000;
    private static final int NOTIFY_RETRY = 3;

    @Autowired
    protected IMetaServerService metaServerService;

    @Autowired
    private Exchange boltExchange;

    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;
    private Map<String, Map<String, Acceptor>> acceptors = new ConcurrentHashMap();
    private Map<String, Map<String, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap();
    private DelayQueue<DelayItem<Acceptor>> delayQueue = new DelayQueue<>();

    @Override // com.alipay.sofa.registry.server.data.datasync.AcceptorStore
    public void checkAcceptorsChangAndExpired() {
        this.acceptors.forEach((str, map) -> {
            if (map == null || map.isEmpty()) {
                return;
            }
            map.forEach((str, acceptor) -> {
                acceptor.checkExpired(0);
            });
        });
    }

    private String getLogByClass(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(" [").append(getClass().getSimpleName()).append("] ").append(str);
        return sb.toString();
    }

    @Override // com.alipay.sofa.registry.server.data.datasync.AcceptorStore
    public void addOperator(Operator operator) {
        Datum datum = operator.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        try {
            Map<String, Acceptor> map = this.acceptors.get(dataCenter);
            if (map == null) {
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                map = this.acceptors.putIfAbsent(dataCenter, concurrentHashMap);
                if (map == null) {
                    map = concurrentHashMap;
                }
            }
            Acceptor acceptor = map.get(dataInfoId);
            if (acceptor == null) {
                Acceptor acceptor2 = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId, dataCenter);
                acceptor = map.putIfAbsent(dataInfoId, acceptor2);
                if (acceptor == null) {
                    acceptor = acceptor2;
                }
            }
            acceptor.appendOperator(operator);
            putCache(acceptor);
        } catch (Exception e) {
            LOGGER.error(getLogByClass("Append Operator error!"), e);
            throw new RuntimeException("Append Operator error!", e);
        }
    }

    private void putCache(Acceptor acceptor) {
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
        try {
            Map<String, Acceptor> map = this.notifyAcceptorsCache.get(dataCenter);
            if (map == null) {
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                map = this.notifyAcceptorsCache.putIfAbsent(dataCenter, concurrentHashMap);
                if (map == null) {
                    map = concurrentHashMap;
                }
            }
            if (map.putIfAbsent(dataInfoId, acceptor) == null) {
                addQueue(acceptor);
            }
        } catch (Exception e) {
            LOGGER.error(getLogByClass("Operator push to delay cache error!"), e);
            throw new RuntimeException("Operator push to delay cache error!", e);
        }
    }

    private void removeCache(Acceptor acceptor) {
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
        try {
            Map<String, Acceptor> map = this.notifyAcceptorsCache.get(dataCenter);
            if (map != null && map.remove(dataInfoId, acceptor)) {
                notifyChange(acceptor);
            }
        } catch (Exception e) {
            LOGGER.error(getLogByClass("Operator remove from delay cache error!"), e);
            throw new RuntimeException("Operator remove from delay cache error!", e);
        }
    }

    private void addQueue(Acceptor acceptor) {
        this.delayQueue.put((DelayQueue<DelayItem<Acceptor>>) new DelayItem<>(acceptor, 3000L));
    }

    private void notifyChange(Acceptor acceptor) {
        Long lastVersion = acceptor.getLastVersion();
        if (lastVersion == null) {
            LOGGER.warn(getLogByClass("There is not data in acceptor queue!maybe has been expired!"));
            lastVersion = 0L;
        }
        if (LOGGER.isDebugEnabled()) {
            acceptor.printInfo();
        }
        NotifyDataSyncRequest notifyDataSyncRequest = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion.longValue(), getType());
        for (String str : getTargetDataIp(acceptor.getDataInfoId())) {
            if (!DataServerConfig.IP.equals(str)) {
                Connection connection = this.dataServerConnectionFactory.getConnection(str);
                if (connection == null) {
                    LOGGER.error(getLogByClass(String.format("Can not get notify data server connection!ip: %s", str)));
                } else {
                    LOGGER.info(getLogByClass("Notify data server {} change data {} to sync"), connection.getRemoteIP(), notifyDataSyncRequest);
                    for (int i = 0; i < NOTIFY_RETRY; i++) {
                        try {
                            Server server = this.boltExchange.getServer(Integer.valueOf(this.dataServerBootstrapConfig.getSyncDataPort()));
                            server.sendSync(server.getChannel(connection.getRemoteAddress()), notifyDataSyncRequest, 1000);
                            break;
                        } catch (Exception e) {
                            LOGGER.error(getLogByClass(String.format("Notify data server %s failed, NotifyDataSyncRequest:%s", str, notifyDataSyncRequest)), e);
                        }
                    }
                }
            }
        }
    }

    public abstract List<String> getTargetDataIp(String str);

    @Override // com.alipay.sofa.registry.server.data.datasync.AcceptorStore
    public void changeDataCheck() {
        while (true) {
            try {
                removeCache(this.delayQueue.take().getItem());
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    @Override // com.alipay.sofa.registry.server.data.datasync.AcceptorStore
    public SyncData getSyncData(SyncDataRequest syncDataRequest) {
        String dataCenter = syncDataRequest.getDataCenter();
        String dataInfoId = syncDataRequest.getDataInfoId();
        Long version = syncDataRequest.getVersion();
        try {
            Map<String, Acceptor> map = this.acceptors.get(dataCenter);
            if (map == null) {
                LOGGER.error(getLogByClass("Can not find Sync Data acceptor instance,dataCenter:{}"), dataCenter);
                throw new RuntimeException("Can not find Sync Data acceptor instance!");
            }
            Acceptor acceptor = map.get(dataInfoId);
            if (acceptor != null) {
                return acceptor.process(version);
            }
            LOGGER.error(getLogByClass("Can not find Sync Data acceptor instance,dataInfoId:{}"), dataInfoId);
            throw new RuntimeException("Can not find Sync Data acceptor instance!");
        } catch (Exception e) {
            LOGGER.error(getLogByClass("Get change SyncData error!"), e);
            throw new RuntimeException("Get change SyncData error!", e);
        }
    }

    public DataServerConfig getDataServerConfig() {
        return this.dataServerBootstrapConfig;
    }
}
