package com.alipay.sofa.registry.server.data.change.event;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.UnPublisher;
import com.alipay.sofa.registry.server.data.change.ChangeData;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.google.common.collect.Interners;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/change/event/DataChangeEventQueue.class */
public class DataChangeEventQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeEventQueue.class);
    private final String name;
    private final BlockingQueue<IDataChangeEvent> eventQueue;
    private final int notifyIntervalMs;
    private DataServerConfig dataServerConfig;
    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP = new ConcurrentHashMap();
    private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue<>();
    private final ReentrantLock lock = new ReentrantLock();

    public DataChangeEventQueue(int i, DataServerConfig dataServerConfig) {
        this.name = String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), Integer.valueOf(i));
        this.dataServerConfig = dataServerConfig;
        int queueSize = dataServerConfig.getQueueSize();
        if (queueSize <= 0) {
            this.eventQueue = new LinkedBlockingDeque();
        } else {
            this.eventQueue = new LinkedBlockingDeque(queueSize);
        }
        this.notifyIntervalMs = dataServerConfig.getNotifyIntervalMs();
    }

    public void onChange(IDataChangeEvent iDataChangeEvent) {
        this.eventQueue.add(iDataChangeEvent);
    }

    public String getName() {
        return this.name;
    }

    public ChangeData take() throws InterruptedException {
        ChangeData take = this.CHANGE_QUEUE.take();
        this.lock.lock();
        try {
            Datum datum = take.getDatum();
            this.CHANGE_DATA_MAP.get(datum.getDataCenter()).remove(datum.getDataInfoId());
            this.lock.unlock();
            return take;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private ChangeData getChangeData(String str, String str2, DataSourceTypeEnum dataSourceTypeEnum, DataChangeTypeEnum dataChangeTypeEnum) {
        Map<String, ChangeData> map = this.CHANGE_DATA_MAP.get(str);
        if (map == null) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            map = this.CHANGE_DATA_MAP.putIfAbsent(str, concurrentHashMap);
            if (map == null) {
                map = concurrentHashMap;
            }
        }
        ChangeData changeData = map.get(str2);
        if (changeData == null) {
            ChangeData changeData2 = new ChangeData(null, this.notifyIntervalMs, dataSourceTypeEnum, dataChangeTypeEnum);
            changeData = map.putIfAbsent(str2, changeData2);
            if (changeData == null) {
                changeData = changeData2;
            }
            this.CHANGE_QUEUE.put((DelayQueue<ChangeData>) changeData);
        }
        return changeData;
    }

    public void start() {
        LOGGER.info("[{}] begin start DataChangeEventQueue", getName());
        ExecutorFactory.newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName())).execute(() -> {
            while (true) {
                try {
                    IDataChangeEvent take = this.eventQueue.take();
                    DataChangeScopeEnum scope = take.getScope();
                    if (scope == DataChangeScopeEnum.DATUM) {
                        DataChangeEvent dataChangeEvent = (DataChangeEvent) take;
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum());
                    } else if (scope == DataChangeScopeEnum.CLIENT) {
                        handleHost((ClientChangeEvent) take);
                    }
                } catch (Throwable th) {
                    LOGGER.error("[{}] handle change event failed", getName(), th);
                }
            }
        });
        LOGGER.info("[{}] start DataChangeEventQueue success", getName());
    }

    private void handleHost(ClientChangeEvent clientChangeEvent) {
        String host = clientChangeEvent.getHost();
        synchronized (Interners.newWeakInterner().intern(host)) {
            Map<String, Publisher> byHost = DatumCache.getByHost(host);
            if (byHost == null || byHost.isEmpty()) {
                LOGGER.info("[{}] no datum to handle, host={}", getName(), host);
            } else {
                int i = 0;
                for (Publisher publisher : byHost.values()) {
                    if (DataServerConfig.IP.equals(DataServerNodeFactory.computeDataServerNode(this.dataServerConfig.getLocalDataCenter(), publisher.getDataInfoId()).getIp())) {
                        Datum datum = new Datum(new UnPublisher(publisher.getDataInfoId(), publisher.getRegisterId(), clientChangeEvent.getOccurredTimestamp()), clientChangeEvent.getDataCenter(), clientChangeEvent.getVersion());
                        datum.setContainsUnPub(true);
                        handleDatum(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum);
                        i++;
                    }
                }
                LOGGER.info("[{}] client off handle, host={}, occurTimestamp={},version={},handle pub size={}", new Object[]{getName(), host, Long.valueOf(clientChangeEvent.getOccurredTimestamp()), Long.valueOf(clientChangeEvent.getVersion()), Integer.valueOf(i)});
            }
        }
    }

    private void handleDatum(DataChangeTypeEnum dataChangeTypeEnum, DataSourceTypeEnum dataSourceTypeEnum, Datum datum) {
        this.lock.lock();
        try {
            ChangeData changeData = getChangeData(datum.getDataCenter(), datum.getDataInfoId(), dataSourceTypeEnum, dataChangeTypeEnum);
            Datum datum2 = changeData.getDatum();
            if (dataChangeTypeEnum == DataChangeTypeEnum.COVER || datum2 == null) {
                changeData.setDatum(datum);
            } else {
                Map pubMap = datum.getPubMap();
                Map pubMap2 = datum2.getPubMap();
                for (Publisher publisher : pubMap.values()) {
                    String registerId = publisher.getRegisterId();
                    Publisher publisher2 = (Publisher) pubMap2.get(registerId);
                    if (publisher2 == null || (publisher.getRegisterTimestamp() >= publisher2.getRegisterTimestamp() && ((publisher instanceof UnPublisher) || (publisher2 instanceof UnPublisher) || !publisher.getSourceAddress().equals(publisher2.getSourceAddress()) || publisher2.getVersion().longValue() < publisher.getVersion().longValue()))) {
                        pubMap2.put(registerId, publisher);
                        datum2.setVersion(datum.getVersion());
                    }
                }
            }
        } finally {
            this.lock.unlock();
        }
    }
}
