package com.alipay.sofa.registry.server.data.change;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.MergeResult;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventQueue;
import com.alipay.sofa.registry.server.data.change.notify.IDataChangeNotifier;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Resource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/alipay/sofa/registry/server/data/change/DataChangeHandler.class */
public class DataChangeHandler implements InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeHandler.class);

    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;

    /* loaded from: input_file:com/alipay/sofa/registry/server/data/change/DataChangeHandler$ChangeNotifier.class */
    private class ChangeNotifier implements Runnable {
        private ChangeData changeData;
        private String name;

        public ChangeNotifier(ChangeData changeData, String str) {
            this.changeData = changeData;
            this.name = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            Datum datum = this.changeData.getDatum();
            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            long version = datum.getVersion();
            DataSourceTypeEnum sourceType = this.changeData.getSourceType();
            DataChangeTypeEnum changeType = this.changeData.getChangeType();
            try {
                if (sourceType != DataSourceTypeEnum.CLEAN) {
                    if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                        notifyTempPub(datum, sourceType, changeType);
                        return;
                    }
                    MergeResult putDatum = DatumCache.putDatum(changeType, datum);
                    Long lastVersion = putDatum.getLastVersion();
                    if (lastVersion != null && lastVersion.longValue() == -2) {
                        DataChangeHandler.LOGGER.error("[DataChangeHandler][{}] first put unPub datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, Long.valueOf(version), sourceType, Boolean.valueOf(datum.isContainsUnPub())});
                        return;
                    }
                    boolean isChangeFlag = putDatum.isChangeFlag();
                    DataChangeHandler.LOGGER.info("[DataChangeHandler][{}] datum handle,datum={},dataCenter={}, dataInfoId={}, version={}, lastVersion={}, sourceType={}, changeType={},changeFlag={},isContainsUnPub={}", new Object[]{this.name, Integer.valueOf(datum.hashCode()), dataCenter, dataInfoId, Long.valueOf(version), lastVersion, sourceType, changeType, Boolean.valueOf(isChangeFlag), Boolean.valueOf(datum.isContainsUnPub())});
                    if ((lastVersion == null || version != lastVersion.longValue()) && isChangeFlag) {
                        for (IDataChangeNotifier iDataChangeNotifier : DataChangeHandler.this.dataChangeNotifiers) {
                            if (iDataChangeNotifier.getSuitableSource().contains(sourceType)) {
                                iDataChangeNotifier.notify(datum, lastVersion);
                            }
                        }
                    }
                } else if (DatumCache.cleanDatum(dataCenter, dataInfoId)) {
                    DataChangeHandler.LOGGER.info("[DataChangeHandler][{}] clean datum, dataCenter={}, dataInfoId={}, version={},sourceType={}, changeType={}", new Object[]{this.name, dataCenter, dataInfoId, Long.valueOf(version), sourceType, changeType});
                }
            } catch (Exception e) {
                DataChangeHandler.LOGGER.error("[DataChangeHandler][{}] put datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, Long.valueOf(version), sourceType, Boolean.valueOf(datum.isContainsUnPub()), e});
            }
        }

        private void notifyTempPub(Datum datum, DataSourceTypeEnum dataSourceTypeEnum, DataChangeTypeEnum dataChangeTypeEnum) {
            DataChangeHandler.LOGGER.info("[DataChangeHandler][{}] datum handle temp pub,datum={},dataCenter={}, dataInfoId={}, version={}, sourceType={}, changeType={},isContainsUnPub={}", new Object[]{this.name, Integer.valueOf(datum.hashCode()), datum.getDataCenter(), datum.getDataInfoId(), Long.valueOf(datum.getVersion()), dataSourceTypeEnum, dataChangeTypeEnum, Boolean.valueOf(datum.isContainsUnPub())});
            for (IDataChangeNotifier iDataChangeNotifier : DataChangeHandler.this.dataChangeNotifiers) {
                if (iDataChangeNotifier.getSuitableSource().contains(dataSourceTypeEnum)) {
                    iDataChangeNotifier.notify(datum, null);
                }
            }
        }
    }

    public void afterPropertiesSet() {
        this.dataChangeEventCenter.init(this.dataServerBootstrapConfig);
        start();
    }

    public void start() {
        DataChangeEventQueue[] queues = this.dataChangeEventCenter.getQueues();
        Executor newFixedThreadPool = ExecutorFactory.newFixedThreadPool(queues.length, DataChangeHandler.class.getSimpleName());
        Executor newFixedThreadPool2 = ExecutorFactory.newFixedThreadPool(this.dataServerBootstrapConfig.getQueueCount() * 5, getClass().getSimpleName());
        for (DataChangeEventQueue dataChangeEventQueue : queues) {
            String name = dataChangeEventQueue.getName();
            LOGGER.info("[DataChangeHandler] begin to notify datum in queue:{}", name);
            newFixedThreadPool.execute(() -> {
                while (true) {
                    try {
                        newFixedThreadPool2.execute(new ChangeNotifier(dataChangeEventQueue.take(), name));
                    } catch (Throwable th) {
                        LOGGER.error("[DataChangeHandler][{}] notify scheduler error", name, th);
                    }
                }
            });
            LOGGER.info("[DataChangeHandler] notify datum in queue:{} success", name);
        }
    }
}
