package org.apache.rocketmq.controller.impl;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerAddrInfo;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.BrokerLiveInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.class */
public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqController");
    private static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 10000;
    private final ControllerConfig controllerConfig;
    private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
    private final ExecutorService executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
    private final Map<BrokerAddrInfo, BrokerLiveInfo> brokerLiveTable = new ConcurrentHashMap(256);
    private final List<BrokerHeartbeatManager.BrokerLifecycleListener> brokerLifecycleListeners = new ArrayList();

    public DefaultBrokerHeartbeatManager(ControllerConfig controllerConfig) {
        this.controllerConfig = controllerConfig;
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void start() {
        this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 2000L, this.controllerConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void shutdown() {
        this.scheduledService.shutdown();
        this.executor.shutdown();
    }

    public void scanNotActiveBroker() {
        try {
            log.info("start scanNotActiveBroker");
            Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = it.next();
                long lastUpdateTimestamp = next.getValue().getLastUpdateTimestamp();
                long heartbeatTimeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
                if (lastUpdateTimestamp + heartbeatTimeoutMillis < System.currentTimeMillis()) {
                    Channel channel = next.getValue().getChannel();
                    it.remove();
                    if (channel != null) {
                        RemotingUtil.closeChannel(channel);
                    }
                    this.executor.submit(() -> {
                        notifyBrokerInActive(((BrokerAddrInfo) next.getKey()).getClusterName(), ((BrokerLiveInfo) next.getValue()).getBrokerName(), ((BrokerAddrInfo) next.getKey()).getBrokerAddr(), Long.valueOf(((BrokerLiveInfo) next.getValue()).getBrokerId()));
                    });
                    log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", new Object[]{next.getValue().getChannel(), next.getKey(), Long.valueOf(heartbeatTimeoutMillis)});
                }
            }
        } catch (Exception e) {
            log.error("scanNotActiveBroker exception", e);
        }
    }

    private void notifyBrokerInActive(String str, String str2, String str3, Long l) {
        Iterator<BrokerHeartbeatManager.BrokerLifecycleListener> it = this.brokerLifecycleListeners.iterator();
        while (it.hasNext()) {
            it.next().onBrokerInactive(str, str2, str3, l.longValue());
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void addBrokerLifecycleListener(BrokerHeartbeatManager.BrokerLifecycleListener brokerLifecycleListener) {
        this.brokerLifecycleListeners.add(brokerLifecycleListener);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void registerBroker(String str, String str2, String str3, long j, Long l, Channel channel, Integer num, Long l2) {
        BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(str, str3);
        if (this.brokerLiveTable.put(brokerAddrInfo, new BrokerLiveInfo(str2, str3, j, System.currentTimeMillis(), l == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : l.longValue(), channel, num == null ? -1 : num.intValue(), l2 == null ? -1L : l2.longValue())) == null) {
            log.info("new broker registered, {}, brokerId:{}", brokerAddrInfo, Long.valueOf(j));
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void changeBrokerMetadata(String str, String str2, Long l) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(new BrokerAddrInfo(str, str2));
        if (brokerLiveInfo != null) {
            brokerLiveInfo.setBrokerId(l.longValue());
            log.info("Change broker {}'s brokerId to {}", str2, l);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerHeartbeat(String str, String str2, Integer num, Long l, Long l2) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(new BrokerAddrInfo(str, str2));
        if (null == brokerLiveInfo) {
            return;
        }
        int intValue = ((Integer) Optional.ofNullable(num).orElse(-1)).intValue();
        long longValue = ((Long) Optional.ofNullable(l).orElse(-1L)).longValue();
        long longValue2 = ((Long) Optional.ofNullable(l2).orElse(-1L)).longValue();
        brokerLiveInfo.setLastUpdateTimestamp(System.currentTimeMillis());
        if (intValue > brokerLiveInfo.getEpoch() || (intValue == brokerLiveInfo.getEpoch() && longValue > brokerLiveInfo.getMaxOffset())) {
            brokerLiveInfo.setEpoch(intValue);
            brokerLiveInfo.setMaxOffset(longValue);
            brokerLiveInfo.setConfirmOffset(longValue2);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerChannelClose(Channel channel) {
        BrokerAddrInfo brokerAddrInfo = null;
        Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = it.next();
            if (next.getValue().getChannel() == channel) {
                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", new Object[]{next.getValue().getChannel(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), Long.valueOf(next.getValue().getBrokerId())});
                brokerAddrInfo = next.getKey();
                this.executor.submit(() -> {
                    notifyBrokerInActive(((BrokerAddrInfo) next.getKey()).getClusterName(), ((BrokerLiveInfo) next.getValue()).getBrokerName(), ((BrokerAddrInfo) next.getKey()).getBrokerAddr(), Long.valueOf(((BrokerLiveInfo) next.getValue()).getBrokerId()));
                });
                break;
            }
        }
        if (brokerAddrInfo != null) {
            this.brokerLiveTable.remove(brokerAddrInfo);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public BrokerLiveInfo getBrokerLiveInfo(String str, String str2) {
        return this.brokerLiveTable.get(new BrokerAddrInfo(str, str2));
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public boolean isBrokerActive(String str, String str2) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(new BrokerAddrInfo(str, str2));
        return brokerLiveInfo != null && brokerLiveInfo.getLastUpdateTimestamp() + brokerLiveInfo.getHeartbeatTimeoutMillis() >= System.currentTimeMillis();
    }
}
