package com.alipay.sofa.registry.server.session.scheduler.task;

import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.BaseInfo;
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.core.model.ReceivedData;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.cache.CacheService;
import com.alipay.sofa.registry.server.session.cache.DatumKey;
import com.alipay.sofa.registry.server.session.cache.Key;
import com.alipay.sofa.registry.server.session.cache.Value;
import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter;
import com.alipay.sofa.registry.server.session.node.NodeManagerFactory;
import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.ReSubscribers;
import com.alipay.sofa.registry.task.batcher.TaskProcessor;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:com/alipay/sofa/registry/server/session/scheduler/task/DataChangeFetchCloudTask.class */
public class DataChangeFetchCloudTask extends AbstractSessionTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeFetchCloudTask.class);
    private static final Logger taskLogger = LoggerFactory.getLogger(DataChangeFetchCloudTask.class, "[Task]");
    private final SessionServerConfig sessionServerConfig;
    private Interests sessionInterests;
    private final TaskListenerManager taskListenerManager;
    private final ExecutorManager executorManager;
    private String fetchDataInfoId;
    private final CacheService sessionCacheService;

    public DataChangeFetchCloudTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, Interests interests, ExecutorManager executorManager, CacheService cacheService) {
        this.sessionServerConfig = sessionServerConfig;
        this.taskListenerManager = taskListenerManager;
        this.sessionInterests = interests;
        this.executorManager = executorManager;
        this.sessionCacheService = cacheService;
    }

    @Override // com.alipay.sofa.registry.server.session.scheduler.task.AbstractSessionTask
    public long getExpiryTime() {
        return -1L;
    }

    public void setTaskEvent(TaskEvent taskEvent) {
        Object eventObj = taskEvent.getEventObj();
        if (!(eventObj instanceof String)) {
            throw new IllegalArgumentException("Input task event object error!");
        }
        this.fetchDataInfoId = (String) eventObj;
    }

    public void execute() {
        Map<String, Datum> datumsCache = getDatumsCache();
        if (datumsCache == null || datumsCache.isEmpty()) {
            LOGGER.error("Get publisher data error,which dataInfoId:{}", this.fetchDataInfoId);
            return;
        }
        PushTaskClosure taskClosure = getTaskClosure(datumsCache);
        for (ScopeEnum scopeEnum : ScopeEnum.values()) {
            Map<InetSocketAddress, Map<String, Subscriber>> cache = getCache(this.fetchDataInfoId, scopeEnum);
            if (cache != null && !cache.isEmpty()) {
                Iterator<Map.Entry<InetSocketAddress, Map<String, Subscriber>>> it = cache.entrySet().iterator();
                while (it.hasNext()) {
                    Map<String, Subscriber> value = it.next().getValue();
                    if (value != null && !value.isEmpty()) {
                        ArrayList arrayList = new ArrayList(value.keySet());
                        Subscriber next = value.values().iterator().next();
                        evictReSubscribers(value.values());
                        fireReceivedDataMultiPushTask(datumsCache, arrayList, scopeEnum, next, value, taskClosure);
                    }
                }
            }
        }
        taskClosure.start();
    }

    public PushTaskClosure getTaskClosure(Map<String, Datum> map) {
        PushTaskClosure pushTaskClosure = new PushTaskClosure(this.executorManager.getPushTaskClosureExecutor());
        pushTaskClosure.setTaskClosure((processingResult, task) -> {
            if (processingResult != TaskProcessor.ProcessingResult.Success) {
                LOGGER.warn("Push tasks found error,subscribers version can not be update!dataInfoId={}", this.fetchDataInfoId);
            } else if (this.sessionServerConfig.isStopPushSwitch()) {
                LOGGER.info("Stop Push switch on,dataInfoId {} version can not be update!", this.fetchDataInfoId);
            } else {
                map.forEach((str, datum) -> {
                    String str = this.fetchDataInfoId;
                    Long valueOf = Long.valueOf(datum.getVersion());
                    if (this.sessionInterests.checkAndUpdateInterestVersions(str, str, valueOf)) {
                        LOGGER.info("Push all tasks success,dataCenter:{} dataInfoId:{} version:{} update!", new Object[]{str, str, valueOf});
                    } else {
                        LOGGER.info("Push all tasks success,but dataCenter:{} dataInfoId:{} version:{} need not update!", new Object[]{str, str, valueOf});
                    }
                });
            }
        });
        return pushTaskClosure;
    }

    private void evictReSubscribers(Collection<Subscriber> collection) {
        if (this.sessionInterests instanceof ReSubscribers) {
            ReSubscribers reSubscribers = (ReSubscribers) this.sessionInterests;
            reSubscribers.getClass();
            collection.forEach(reSubscribers::deleteReSubscriber);
        }
    }

    private Map<InetSocketAddress, Map<String, Subscriber>> getCache(String str, ScopeEnum scopeEnum) {
        return this.sessionInterests.querySubscriberIndex(str, scopeEnum);
    }

    private Map<String, Datum> getDatumsCache() {
        HashMap hashMap = new HashMap();
        Collection<String> dataCenters = NodeManagerFactory.getNodeManager(Node.NodeType.META).getDataCenters();
        if (dataCenters != null) {
            Map<Key, Value> values = this.sessionCacheService.getValues((Collection) dataCenters.stream().map(str -> {
                return new Key(Key.KeyType.OBJ, DatumKey.class.getName(), new DatumKey(this.fetchDataInfoId, str));
            }).collect(Collectors.toList()));
            if (values != null) {
                values.forEach((key, value) -> {
                    if (value == null || value.getPayload() == null) {
                        return;
                    }
                    hashMap.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                });
            }
        }
        return hashMap;
    }

    private void fireReceivedDataMultiPushTask(Map<String, Datum> map, List<String> list, ScopeEnum scopeEnum, Subscriber subscriber, Map<String, Subscriber> map2, PushTaskClosure pushTaskClosure) {
        if (!(!BaseInfo.ClientVersion.StoreData.equals(subscriber.getClientVersion()))) {
            fireReceiveDataPushTask(map, list, scopeEnum, subscriber, map2, pushTaskClosure);
        } else if (subscriber.getScope() == ScopeEnum.zone) {
            fireUserDataElementPushTask(ReceivedDataConverter.getMergeDatum(map), subscriber, map2, pushTaskClosure);
        } else {
            fireUserDataElementMultiPushTask(ReceivedDataConverter.getMergeDatum(map), subscriber, map2, pushTaskClosure);
        }
    }

    private void fireReceiveDataPushTask(Map<String, Datum> map, List<String> list, ScopeEnum scopeEnum, Subscriber subscriber, Map<String, Subscriber> map2, PushTaskClosure pushTaskClosure) {
        ArrayList arrayList = new ArrayList(map2.values());
        ReceivedData receivedDataMulti = ReceivedDataConverter.getReceivedDataMulti(map, scopeEnum, list, subscriber);
        HashMap hashMap = new HashMap();
        hashMap.put(receivedDataMulti, subscriber.getSourceAddress());
        TaskEvent taskEvent = new TaskEvent(hashMap, TaskEvent.TaskType.RECEIVED_DATA_MULTI_PUSH_TASK);
        taskEvent.setTaskClosure(pushTaskClosure);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, arrayList);
        taskLogger.info("send {} taskURL:{},taskScope:{}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), scopeEnum});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void fireUserDataElementPushTask(Datum datum, Subscriber subscriber, Map<String, Subscriber> map, PushTaskClosure pushTaskClosure) {
        ArrayList arrayList = new ArrayList(map.values());
        TaskEvent taskEvent = new TaskEvent(TaskEvent.TaskType.USER_DATA_ELEMENT_PUSH_TASK);
        taskEvent.setTaskClosure(pushTaskClosure);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, arrayList);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, subscriber.getSourceAddress());
        taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), datum.getDataCenter(), Integer.valueOf((datum == null || datum.getPubMap() == null) ? 0 : datum.getPubMap().size()), Integer.valueOf(arrayList.size())});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void fireUserDataElementMultiPushTask(Datum datum, Subscriber subscriber, Map<String, Subscriber> map, PushTaskClosure pushTaskClosure) {
        ArrayList arrayList = new ArrayList(map.values());
        TaskEvent taskEvent = new TaskEvent(TaskEvent.TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK);
        taskEvent.setTaskClosure(pushTaskClosure);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, arrayList);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_URL, subscriber.getSourceAddress());
        taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), datum.getDataCenter(), Integer.valueOf((datum == null || datum.getPubMap() == null) ? 0 : datum.getPubMap().size()), Integer.valueOf(arrayList.size())});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    public boolean checkRetryTimes() {
        return checkRetryTimes(this.sessionServerConfig.getDataChangeFetchTaskRetryTimes());
    }

    public String toString() {
        return "DATA_CHANGE_FETCH_CLOUD_TASK{taskId='" + getTaskId() + "', fetchDataInfoId=" + this.fetchDataInfoId + ", expiryTime='" + getExpiryTime() + "'}";
    }
}
