package org.apache.rocketmq.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.schedule.DelayOffsetSerializeWrapper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
import org.apache.rocketmq.store.timer.TimerCheckpoint;

/* loaded from: input_file:org/apache/rocketmq/broker/BrokerPreOnlineService.class */
public class BrokerPreOnlineService extends ServiceThread {
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqBroker");
    private final BrokerController brokerController;
    private int waitBrokerIndex = 0;

    public BrokerPreOnlineService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public String getServiceName() {
        return (this.brokerController == null || !this.brokerController.getBrokerConfig().isInBrokerContainer()) ? BrokerPreOnlineService.class.getSimpleName() : this.brokerController.getBrokerIdentity().getIdentifier() + BrokerPreOnlineService.class.getSimpleName();
    }

    public void run() {
        LOGGER.info(getServiceName() + " service started");
        while (true) {
            if (!isStopped()) {
                if (!this.brokerController.isIsolated()) {
                    LOGGER.info("broker {} is online", this.brokerController.getBrokerConfig().getCanonicalName());
                    break;
                }
                try {
                } catch (Exception e) {
                    LOGGER.error("Broker preOnline error, ", e);
                }
                if (prepareForBrokerOnline()) {
                    break;
                } else {
                    waitForRunning(1000L);
                }
            } else {
                break;
            }
        }
        LOGGER.info(getServiceName() + " service end");
    }

    CompletableFuture<Boolean> waitForHaHandshakeComplete(String str) {
        LOGGER.info("wait for handshake completion with {}", str);
        HAConnectionStateNotificationRequest hAConnectionStateNotificationRequest = new HAConnectionStateNotificationRequest(HAConnectionState.TRANSFER, RemotingHelper.parseHostFromAddress(str), true);
        if (this.brokerController.getMessageStore().getHaService() != null) {
            this.brokerController.getMessageStore().getHaService().putGroupConnectionStateRequest(hAConnectionStateNotificationRequest);
        } else {
            LOGGER.error("HAService is null, maybe broker config is wrong. For example, duplicationEnable is true");
            hAConnectionStateNotificationRequest.getRequestFuture().complete(false);
        }
        return hAConnectionStateNotificationRequest.getRequestFuture();
    }

    private boolean futureWaitAction(boolean z, BrokerMemberGroup brokerMemberGroup) {
        if (!z) {
            LOGGER.error("wait for handshake completion failed, HA connection lost");
            return false;
        }
        if (this.brokerController.getBrokerConfig().getBrokerId() == 0) {
            return true;
        }
        LOGGER.info("slave preOnline complete, start service");
        long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
        this.brokerController.startService(minBrokerId, (String) brokerMemberGroup.getBrokerAddrs().get(Long.valueOf(minBrokerId)));
        return true;
    }

    private boolean prepareForMasterOnline(BrokerMemberGroup brokerMemberGroup) {
        ArrayList arrayList = new ArrayList(brokerMemberGroup.getBrokerAddrs().keySet());
        Collections.sort(arrayList);
        while (this.waitBrokerIndex < arrayList.size()) {
            String str = (String) brokerMemberGroup.getBrokerAddrs().get(arrayList.get(this.waitBrokerIndex));
            try {
                this.brokerController.getBrokerOuterAPI().sendBrokerHaInfo(str, this.brokerController.getHAServerAddr(), this.brokerController.getMessageStore().getBrokerInitMaxOffset(), this.brokerController.getBrokerAddr());
                try {
                    if (!((Boolean) waitForHaHandshakeComplete(str).thenApply(bool -> {
                        return Boolean.valueOf(futureWaitAction(bool.booleanValue(), brokerMemberGroup));
                    }).get()).booleanValue() || !syncMetadataReverse(str)) {
                        return false;
                    }
                    this.waitBrokerIndex++;
                } catch (Exception e) {
                    LOGGER.error("Wait handshake completion exception, {}", e);
                    return false;
                }
            } catch (Exception e2) {
                LOGGER.error("send ha address to {} exception, {}", str, e2);
                return false;
            }
        }
        LOGGER.info("master preOnline complete, start service");
        this.brokerController.startService(0L, this.brokerController.getBrokerAddr());
        return true;
    }

    private boolean syncMetadataReverse(String str) {
        try {
            LOGGER.info("Get metadata reverse from {}", str);
            String allDelayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(str);
            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = (DelayOffsetSerializeWrapper) DelayOffsetSerializeWrapper.fromJson(allDelayOffset, DelayOffsetSerializeWrapper.class);
            ConsumerOffsetSerializeWrapper allConsumerOffset = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(str);
            TimerCheckpoint timerCheckPoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(str);
            if (null != allConsumerOffset && this.brokerController.getConsumerOffsetManager().getDataVersion().compare(allConsumerOffset.getDataVersion()) <= 0) {
                LOGGER.info("{}'s consumerOffset data version is larger than master broker, {}'s consumerOffset will be used.", str, str);
                this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(allConsumerOffset.getOffsetTable());
                this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(allConsumerOffset.getDataVersion());
                this.brokerController.getConsumerOffsetManager().persist();
            }
            if (null != allDelayOffset && this.brokerController.getScheduleMessageService().getDataVersion().compare(delayOffsetSerializeWrapper.getDataVersion()) <= 0) {
                LOGGER.info("{}'s scheduleMessageService data version is larger than master broker, {}'s delayOffset will be used.", str, str);
                String delayOffsetStorePath = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
                try {
                    MixAll.string2File(allDelayOffset, delayOffsetStorePath);
                    this.brokerController.getScheduleMessageService().load();
                } catch (IOException e) {
                    LOGGER.error("Persist file Exception, {}", delayOffsetStorePath, e);
                }
            }
            if (null != this.brokerController.getTimerCheckpoint() && this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckPoint.getDataVersion()) <= 0) {
                LOGGER.info("{}'s timerCheckpoint data version is larger than master broker, {}'s timerCheckpoint will be used.", str, str);
                this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckPoint.getLastReadTimeMs());
                this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckPoint.getMasterTimerQueueOffset());
                this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckPoint.getDataVersion());
                this.brokerController.getTimerCheckpoint().flush();
            }
            for (BrokerAttachedPlugin brokerAttachedPlugin : this.brokerController.getBrokerAttachedPlugins()) {
                if (brokerAttachedPlugin != null) {
                    brokerAttachedPlugin.syncMetadataReverse(str);
                }
            }
            return true;
        } catch (Exception e2) {
            LOGGER.error("GetMetadataReverse Failed", e2);
            return false;
        }
    }

    private boolean prepareForSlaveOnline(BrokerMemberGroup brokerMemberGroup) {
        try {
            BrokerSyncInfo retrieveBrokerHaInfo = this.brokerController.getBrokerOuterAPI().retrieveBrokerHaInfo((String) brokerMemberGroup.getBrokerAddrs().get(0L));
            if (this.brokerController.getMessageStore().getMasterFlushedOffset() == 0 && this.brokerController.getMessageStoreConfig().isSyncMasterFlushOffsetWhenStartup()) {
                LOGGER.info("Set master flush offset in slave to {}", Long.valueOf(retrieveBrokerHaInfo.getMasterFlushOffset()));
                this.brokerController.getMessageStore().setMasterFlushedOffset(retrieveBrokerHaInfo.getMasterFlushOffset());
            }
            if (retrieveBrokerHaInfo.getMasterHaAddress() == null) {
                LOGGER.info("fetch master ha address return null, start service directly");
                long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
                this.brokerController.startService(minBrokerId, (String) brokerMemberGroup.getBrokerAddrs().get(Long.valueOf(minBrokerId)));
                return true;
            }
            this.brokerController.getMessageStore().updateHaMasterAddress(retrieveBrokerHaInfo.getMasterHaAddress());
            this.brokerController.getMessageStore().updateMasterAddress(retrieveBrokerHaInfo.getMasterAddress());
            try {
                return ((Boolean) waitForHaHandshakeComplete(retrieveBrokerHaInfo.getMasterHaAddress()).thenApply(bool -> {
                    return Boolean.valueOf(futureWaitAction(bool.booleanValue(), brokerMemberGroup));
                }).get()).booleanValue();
            } catch (Exception e) {
                LOGGER.error("Wait handshake completion exception, {}", e);
                return false;
            }
        } catch (Exception e2) {
            LOGGER.error("retrieve master ha info exception, {}", e2);
            return false;
        }
    }

    private boolean prepareForBrokerOnline() {
        try {
            BrokerMemberGroup syncBrokerMemberGroup = this.brokerController.getBrokerOuterAPI().syncBrokerMemberGroup(this.brokerController.getBrokerConfig().getBrokerClusterName(), this.brokerController.getBrokerConfig().getBrokerName(), this.brokerController.getBrokerConfig().isCompatibleWithOldNameSrv());
            if (syncBrokerMemberGroup == null || syncBrokerMemberGroup.getBrokerAddrs().isEmpty()) {
                LOGGER.info("no other broker online, will start service directly");
                this.brokerController.startService(this.brokerController.getBrokerConfig().getBrokerId(), this.brokerController.getBrokerAddr());
                return true;
            }
            long minBrokerId = getMinBrokerId(syncBrokerMemberGroup.getBrokerAddrs());
            if (this.brokerController.getBrokerConfig().getBrokerId() == 0) {
                return prepareForMasterOnline(syncBrokerMemberGroup);
            }
            if (minBrokerId == 0) {
                return prepareForSlaveOnline(syncBrokerMemberGroup);
            }
            LOGGER.info("no master online, start service directly");
            this.brokerController.startService(minBrokerId, (String) syncBrokerMemberGroup.getBrokerAddrs().get(Long.valueOf(minBrokerId)));
            return true;
        } catch (Exception e) {
            LOGGER.error("syncBrokerMemberGroup from namesrv error, start service failed, will try later, ", e);
            return false;
        }
    }

    private long getMinBrokerId(Map<Long, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.remove(Long.valueOf(this.brokerController.getBrokerConfig().getBrokerId()));
        return !hashMap.isEmpty() ? ((Long) Collections.min(hashMap.keySet())).longValue() : this.brokerController.getBrokerConfig().getBrokerId();
    }
}
