package com.alipay.sofa.registry.server.session.scheduler.task;

import com.alipay.sofa.registry.common.model.metaserver.DataOperator;
import com.alipay.sofa.registry.common.model.metaserver.NotifyProvideDataChange;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.common.model.store.DataInfo;
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.common.model.store.Watcher;
import com.alipay.sofa.registry.core.model.ReceivedConfigData;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.net.NetUtil;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter;
import com.alipay.sofa.registry.server.session.node.service.MetaNodeService;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.ReSubscribers;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alipay/sofa/registry/server/session/scheduler/task/ProvideDataChangeFetchTask.class */
public class ProvideDataChangeFetchTask extends AbstractSessionTask {
    private static final Logger TASK_LOGGER = LoggerFactory.getLogger(ProvideDataChangeFetchTask.class, "[Task]");
    private static final Logger LOGGER = LoggerFactory.getLogger(ProvideDataChangeFetchTask.class);
    private final SessionServerConfig sessionServerConfig;
    private final TaskListenerManager taskListenerManager;
    private final MetaNodeService metaNodeService;
    private final Watchers sessionWatchers;
    private final Exchange boltExchange;
    private final Interests sessionInterests;
    private final Registry sessionRegistry;
    private NotifyProvideDataChange notifyProvideDataChange;

    public ProvideDataChangeFetchTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, MetaNodeService metaNodeService, Watchers watchers, Exchange exchange, Interests interests, Registry registry) {
        this.sessionServerConfig = sessionServerConfig;
        this.taskListenerManager = taskListenerManager;
        this.metaNodeService = metaNodeService;
        this.sessionWatchers = watchers;
        this.boltExchange = exchange;
        this.sessionInterests = interests;
        this.sessionRegistry = registry;
    }

    public void setTaskEvent(TaskEvent taskEvent) {
        Object eventObj = taskEvent.getEventObj();
        if (!(eventObj instanceof NotifyProvideDataChange)) {
            throw new IllegalArgumentException("Input task event object error!");
        }
        this.notifyProvideDataChange = (NotifyProvideDataChange) eventObj;
    }

    public void execute() {
        ProvideData provideData = null;
        String dataInfoId = this.notifyProvideDataChange.getDataInfoId();
        if (this.notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {
            provideData = this.metaNodeService.fetchData(dataInfoId);
            if ("session.stop.push.data.switch#@#9600#@#CONFIG".equals(dataInfoId)) {
                if (provideData == null) {
                    LOGGER.info("Fetch session stop push switch data null,config not change!");
                    return;
                }
                if (provideData.getProvideData() == null || provideData.getProvideData().getObject() == null) {
                    LOGGER.info("Fetch session stop push switch no data existed,config not change!");
                    return;
                }
                String str = (String) provideData.getProvideData().getObject();
                LOGGER.info("Fetch session stop push data switch {} success!", str);
                if (str == null) {
                    LOGGER.error("Fetch session stop push data switch is null!");
                    return;
                }
                boolean booleanValue = Boolean.valueOf(str).booleanValue();
                boolean z = this.sessionServerConfig.isStopPushSwitch() != booleanValue;
                this.sessionServerConfig.setStopPushSwitch(booleanValue);
                if (booleanValue) {
                    this.sessionServerConfig.setBeginDataFetchTask(false);
                    return;
                } else {
                    if (z) {
                        fireReSubscriber();
                        return;
                    }
                    return;
                }
            }
            if (provideData == null) {
                LOGGER.warn("Notify provider data Change request {} fetch no provider data!", this.notifyProvideDataChange);
                return;
            }
        }
        DataInfo valueOf = DataInfo.valueOf(dataInfoId);
        Server server = this.boltExchange.getServer(Integer.valueOf(this.sessionServerConfig.getServerPort()));
        if (server != null) {
            for (Channel channel : server.getChannels()) {
                Map<String, Watcher> cache = getCache(NetUtil.toAddressString(channel.getRemoteAddress()));
                ArrayList arrayList = new ArrayList();
                cache.forEach((str2, watcher) -> {
                    if (watcher == null || !watcher.getDataInfoId().equals(dataInfoId)) {
                        return;
                    }
                    arrayList.add(str2);
                });
                if (!arrayList.isEmpty()) {
                    ReceivedConfigData receivedConfigData = this.notifyProvideDataChange.getDataOperator() == DataOperator.REMOVE ? ReceivedDataConverter.getReceivedConfigData(null, valueOf, this.notifyProvideDataChange.getVersion()) : ReceivedDataConverter.getReceivedConfigData(provideData.getProvideData(), valueOf, provideData.getVersion());
                    receivedConfigData.setConfiguratorRegistIds(arrayList);
                    firePushTask(receivedConfigData, new URL(channel.getRemoteAddress()));
                }
            }
        }
    }

    private void fireReSubscriber() {
        ReSubscribers reSubscribers;
        Map<String, Map<String, Subscriber>> reSubscribers2;
        try {
            this.sessionRegistry.fetchChangDataProcess();
        } catch (Throwable th) {
            LOGGER.error("Open push switch first fetch task execute error", th);
        }
        try {
            TimeUnit.MINUTES.sleep(1L);
        } catch (InterruptedException e) {
            LOGGER.error("Wait for dataFetch Task Interrupted!");
        }
        this.sessionServerConfig.setBeginDataFetchTask(true);
        if (!(this.sessionInterests instanceof ReSubscribers) || (reSubscribers2 = (reSubscribers = (ReSubscribers) this.sessionInterests).getReSubscribers()) == null || reSubscribers2.isEmpty()) {
            return;
        }
        reSubscribers2.forEach((str, map) -> {
            fireSubscriberMultiFetchTask(str, map.values());
        });
        reSubscribers.clearReSubscribers();
    }

    private void fireSubscriberMultiFetchTask(String str, Collection<Subscriber> collection) {
        if (CollectionUtils.isEmpty(collection)) {
            return;
        }
        TaskEvent taskEvent = new TaskEvent(str, TaskEvent.TaskType.SUBSCRIBER_MULTI_FETCH_TASK);
        taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, collection);
        TASK_LOGGER.info("send " + taskEvent.getTaskType() + " subscribersSize:{},dataInfoId:{}", Integer.valueOf(collection.size()), str);
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void firePushTask(ReceivedConfigData receivedConfigData, URL url) {
        HashMap hashMap = new HashMap();
        hashMap.put(receivedConfigData, url);
        TaskEvent taskEvent = new TaskEvent(hashMap, TaskEvent.TaskType.RECEIVED_DATA_CONFIG_PUSH_TASK);
        TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", taskEvent);
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private Map<String, Watcher> getCache(String str) {
        Map<String, Watcher> queryByConnectId = this.sessionWatchers.queryByConnectId(str);
        return queryByConnectId == null ? new ConcurrentHashMap() : queryByConnectId;
    }

    public boolean checkRetryTimes() {
        return checkRetryTimes(this.sessionServerConfig.getSubscriberRegisterFetchRetryTimes());
    }

    public String toString() {
        return "PROVIDE_DATA_CHANGE_FETCH_TASK{taskId='" + getTaskId() + "', notifyProvideDataChange=" + this.notifyProvideDataChange + ", retryTimes='" + this.sessionServerConfig.getSubscriberRegisterFetchRetryTimes() + "'}";
    }
}
