package com.alipay.sofa.registry.server.data.correction;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.BackupTriad;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.util.DelayItem;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler.class */
public class LocalDataServerCleanHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalDataServerCleanHandler.class);

    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Autowired
    private DataServerCache dataServerCache;

    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    private LocalCleanTask task;
    private final DelayQueue<DelayItem<LocalCleanTask>> EVENT_QUEUE = new DelayQueue<>();

    /* loaded from: input_file:com/alipay/sofa/registry/server/data/correction/LocalDataServerCleanHandler$LocalCleanTask.class */
    private class LocalCleanTask {
        private AtomicBoolean running;

        private LocalCleanTask() {
            this.running = new AtomicBoolean(false);
        }

        public void run() {
            try {
                if (this.running.compareAndSet(false, true)) {
                    try {
                        Map<String, DataNode> dataServers = LocalDataServerCleanHandler.this.dataServerCache.getDataServers(LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
                        if (dataServers == null || dataServers.isEmpty()) {
                            LocalDataServerCleanHandler.LOGGER.warn("Calculate Old BackupTriad,old dataServer list is empty!");
                            LocalDataServerCleanHandler.this.EVENT_QUEUE.add((DelayQueue) new DelayItem(new LocalCleanTask(), LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
                            return;
                        }
                        ConsistentHash consistentHash = new ConsistentHash(LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getNumberOfReplicas(), dataServers.values());
                        for (Map.Entry<String, Map<String, Datum>> entry : DatumCache.getAll().entrySet()) {
                            String key = entry.getKey();
                            for (Map.Entry<String, Datum> entry2 : entry.getValue().entrySet()) {
                                String key2 = entry2.getKey();
                                Datum value = entry2.getValue();
                                if (!this.running.get()) {
                                    LocalDataServerCleanHandler.LOGGER.info("[LocalDataServerCleanHandler] task cancel, dataInfoId={}", key2);
                                    LocalDataServerCleanHandler.this.EVENT_QUEUE.add((DelayQueue) new DelayItem(new LocalCleanTask(), LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
                                    return;
                                } else if (!new BackupTriad(key2, consistentHash.getNUniqueNodesFor(key2, LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getStoreNodes())).containsSelf() && value != null) {
                                    int size = value.getPubMap() != null ? value.getPubMap().size() : 0;
                                    LocalDataServerCleanHandler.this.dataChangeEventCenter.clean(value, DataSourceTypeEnum.CLEAN);
                                    LocalDataServerCleanHandler.LOGGER.info("[LocalDataServerCleanHandler] clean handle, dataCenter={},dataInfoId={},pub size={}", new Object[]{key, key2, Integer.valueOf(size)});
                                }
                            }
                        }
                        LocalDataServerCleanHandler.this.EVENT_QUEUE.add((DelayQueue) new DelayItem(new LocalCleanTask(), LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
                    } catch (Throwable th) {
                        LocalDataServerCleanHandler.LOGGER.error("[LocalDataServerCleanHandler] clean local datum task error!", th);
                        LocalDataServerCleanHandler.this.EVENT_QUEUE.add((DelayQueue) new DelayItem(new LocalCleanTask(), LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
                    }
                }
            } catch (Throwable th2) {
                LocalDataServerCleanHandler.this.EVENT_QUEUE.add((DelayQueue) new DelayItem(new LocalCleanTask(), LocalDataServerCleanHandler.this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
                throw th2;
            }
        }

        public void stop() {
            this.running.set(false);
        }
    }

    public LocalDataServerCleanHandler() {
        LOGGER.info("[LocalDataServerCleanHandler] begin start LocalDataServerCleanHandler");
        ExecutorFactory.newSingleThreadExecutor(LocalDataServerCleanHandler.class.getSimpleName()).execute(() -> {
            while (true) {
                try {
                    this.task = this.EVENT_QUEUE.take().getItem();
                    this.task.run();
                } catch (Throwable th) {
                    LOGGER.error("[LocalDataServerCleanHandler] handle clean task failed", th);
                }
            }
        });
        LOGGER.info("[LocalDataServerCleanHandler] start LocalDataServerCleanHandler success");
    }

    public void reset() {
        synchronized (LocalDataServerCleanHandler.class) {
            if (this.EVENT_QUEUE.isEmpty() && this.task != null) {
                this.task.stop();
                this.EVENT_QUEUE.clear();
            }
        }
        this.EVENT_QUEUE.add((DelayQueue<DelayItem<LocalCleanTask>>) new DelayItem<>(new LocalCleanTask(), this.dataServerBootstrapConfig.getLocalDataServerCleanDelay()));
    }
}
