package com.alipay.sofa.registry.server.data.remoting.dataserver.handler;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.GetDataRequest;
import com.alipay.sofa.registry.common.model.dataserver.NotifyFetchDatumRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
import com.alipay.sofa.registry.util.ParaCheckUtil;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/remoting/dataserver/handler/NotifyFetchDatumHandler.class */
public class NotifyFetchDatumHandler extends AbstractServerHandler<NotifyFetchDatumRequest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotifyFetchDatumHandler.class);

    @Autowired
    private DataServerCache dataServerCache;

    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;

    @Autowired
    private DataServerConfig dataServerConfig;

    @Autowired
    private Exchange boltExchange;

    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Override // com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler
    public void checkParam(NotifyFetchDatumRequest notifyFetchDatumRequest) throws RuntimeException {
        ParaCheckUtil.checkNotBlank(notifyFetchDatumRequest.getIp(), "ip");
    }

    @Override // com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler
    public Object doHandle(Channel channel, NotifyFetchDatumRequest notifyFetchDatumRequest) {
        ParaCheckUtil.checkNotBlank(notifyFetchDatumRequest.getIp(), "ip");
        Map dataVersionMap = notifyFetchDatumRequest.getDataVersionMap();
        long changeVersion = notifyFetchDatumRequest.getChangeVersion();
        String ip = notifyFetchDatumRequest.getIp();
        if (changeVersion < this.dataServerCache.getCurVersion().longValue()) {
            LOGGER.info("[NotifyFetchDatumHandler] ignore notify because changeVersion {} is less than {},ip is {}", new Object[]{Long.valueOf(changeVersion), this.dataServerCache.getCurVersion(), ip});
        } else if (dataVersionMap.isEmpty()) {
            LOGGER.info("[NotifyFetchDatumHandler] get changeVersion map is empty,change version is {},current version is {},ip is {}", new Object[]{Long.valueOf(changeVersion), this.dataServerCache.getCurVersion(), ip});
            this.dataServerCache.synced(changeVersion, ip);
        } else {
            ExecutorFactory.getCommonExecutor().execute(() -> {
                for (Map.Entry entry : dataVersionMap.entrySet()) {
                    String str = (String) entry.getKey();
                    for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                        String str2 = (String) entry2.getKey();
                        Datum datum = DatumCache.get(str, str2);
                        if (datum != null) {
                            long longValue = ((Long) entry2.getValue()).longValue();
                            long version = datum.getVersion();
                            if (version > longValue) {
                                LOGGER.info("[NotifyFetchDatumHandler] ignore fetch because changeVersion {} is less than {},dataInfoId={},dataCenter={}", new Object[]{Long.valueOf(longValue), Long.valueOf(version), str2, str});
                            } else if (datum.getVersion() == ((Long) entry2.getValue()).longValue() && !datum.getPubMap().isEmpty()) {
                            }
                        }
                        fetchDatum(ip, str, str2);
                    }
                }
                this.dataServerCache.synced(changeVersion, ip);
            });
        }
        return CommonResponse.buildSuccessResponse();
    }

    private void fetchDatum(String str, String str2, String str3) {
        while (this.dataServerCache.getDataServers(this.dataServerConfig.getLocalDataCenter()).keySet().contains(str)) {
            Connection connection = this.dataServerConnectionFactory.getConnection(str);
            if (connection == null || !connection.isFine()) {
                throw new RuntimeException(String.format("connection of %s is not available", str));
            }
            try {
                Server server = this.boltExchange.getServer(Integer.valueOf(this.dataServerBootstrapConfig.getSyncDataPort()));
                GenericResponse genericResponse = (GenericResponse) server.sendSync(server.getChannel(connection.getRemoteAddress()), new GetDataRequest(str3, str2), this.dataServerBootstrapConfig.getRpcTimeout());
                if (!genericResponse.isSuccess()) {
                    throw new RuntimeException(genericResponse.getMessage());
                }
                Datum datum = (Datum) ((Map) genericResponse.getData()).get(str2);
                if (datum != null) {
                    this.dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);
                    LOGGER.info("[NotifyFetchDatumHandler] fetch datum success,dataInfoId={},dataCenter={},targetIp={}", new Object[]{datum.getDataInfoId(), datum.getDataCenter(), str});
                }
                return;
            } catch (Exception e) {
                LOGGER.error("[NotifyFetchDatumHandler] fetch datum error", e);
                TimeUtil.randomDelay(500);
            }
        }
    }

    @Override // com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler
    public CommonResponse buildFailedResponse(String str) {
        return CommonResponse.buildFailedResponse(str);
    }

    public ChannelHandler.HandlerType getType() {
        return ChannelHandler.HandlerType.PROCESSER;
    }

    @Override // com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler
    public Class interest() {
        return NotifyFetchDatumRequest.class;
    }

    @Override // com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}
