package org.apache.rocketmq.controller.impl.heartbeat;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.JRaftController;
import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
import org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.class */
public class RaftBrokerHeartBeatManager implements BrokerHeartbeatManager {
    private static final Logger log = LoggerFactory.getLogger("RocketmqController");
    private JRaftController controller;
    private final ControllerConfig controllerConfig;
    private final List<BrokerLifecycleListener> brokerLifecycleListeners = new ArrayList();
    private final Map<Channel, BrokerIdentityInfo> brokerChannelIdentityInfoMap = new HashMap();
    private long firstReceivedHeartbeatTime = -1;
    private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RaftBrokerHeartbeatManager_scheduledService_"));
    private final ExecutorService executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("RaftBrokerHeartbeatManager_executorService_"));

    public RaftBrokerHeartBeatManager(ControllerConfig controllerConfig) {
        this.controllerConfig = controllerConfig;
    }

    public void setController(JRaftController jRaftController) {
        this.controller = jRaftController;
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void initialize() {
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void start() {
        this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 2000L, this.controllerConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void shutdown() {
        this.scheduledService.shutdown();
        this.executor.shutdown();
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void registerBrokerLifecycleListener(BrokerLifecycleListener brokerLifecycleListener) {
        this.brokerLifecycleListeners.add(brokerLifecycleListener);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerHeartbeat(String str, String str2, String str3, Long l, Long l2, Channel channel, Integer num, Long l3, Long l4, Integer num2) {
        RemotingCommand remotingCommand;
        if (this.firstReceivedHeartbeatTime == -1) {
            this.firstReceivedHeartbeatTime = System.currentTimeMillis();
        }
        BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(str, str2, l);
        int intValue = ((Integer) Optional.ofNullable(num).orElse(-1)).intValue();
        BrokerLiveInfo brokerLiveInfo = new BrokerLiveInfo(str2, str3, ((Long) Optional.ofNullable(l).orElse(-1L)).longValue(), System.currentTimeMillis(), ((Long) Optional.ofNullable(l2).orElse(Long.valueOf(BrokerHeartbeatManager.DEFAULT_BROKER_CHANNEL_EXPIRED_TIME))).longValue(), null, intValue, ((Long) Optional.ofNullable(l3).orElse(-1L)).longValue(), Integer.valueOf(((Integer) Optional.ofNullable(num2).orElse(Integer.MAX_VALUE)).intValue()), ((Long) Optional.ofNullable(l4).orElse(-1L)).longValue());
        log.info("broker {} heart beat", brokerIdentityInfo);
        try {
            remotingCommand = this.controller.onBrokerHeartBeat(new RaftBrokerHeartBeatEventRequest(brokerIdentityInfo, brokerLiveInfo)).get(5L, TimeUnit.SECONDS);
        } catch (InterruptedException | RuntimeException | ExecutionException | TimeoutException e) {
            log.error("on broker heartbeat through raft failed", e);
        }
        if (remotingCommand.getCode() != 0 && remotingCommand.getCode() != 2007) {
            throw new RuntimeException("on broker heartbeat return invalid code, code: " + remotingCommand.getCode());
        }
        this.brokerChannelIdentityInfoMap.put(channel, brokerIdentityInfo);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerChannelClose(Channel channel) {
        BrokerIdentityInfo brokerIdentityInfo = this.brokerChannelIdentityInfoMap.get(channel);
        log.info("Channel {} inactive, broker identity info: {}", channel, brokerIdentityInfo);
        if (brokerIdentityInfo != null) {
            try {
                RemotingCommand remotingCommand = this.controller.onBrokerCloseChannel(new BrokerCloseChannelRequest(brokerIdentityInfo)).get(5L, TimeUnit.SECONDS);
                if (remotingCommand.getCode() != 0) {
                    throw new RuntimeException("on broker close channel return invalid code, code: " + remotingCommand.getCode());
                }
                this.executor.submit(() -> {
                    notifyBrokerInActive(brokerIdentityInfo.getClusterName(), brokerIdentityInfo.getBrokerName(), brokerIdentityInfo.getBrokerId());
                });
                this.brokerChannelIdentityInfoMap.remove(channel);
            } catch (InterruptedException | RuntimeException | ExecutionException | TimeoutException e) {
                log.error("on broker close channel through raft failed", e);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r1v6, types: [org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager$1] */
    private Map<BrokerIdentityInfo, BrokerLiveInfo> getBrokerLiveInfo(BrokerIdentityInfo brokerIdentityInfo) {
        try {
            RemotingCommand remotingCommand = this.controller.getBrokerLiveInfo(brokerIdentityInfo == null ? new GetBrokerLiveInfoRequest() : new GetBrokerLiveInfoRequest(brokerIdentityInfo)).get(5L, TimeUnit.SECONDS);
            if (remotingCommand.getCode() != 0) {
                throw new RuntimeException("get broker live info return invalid code, code: " + remotingCommand.getCode());
            }
            return (Map) JSON.parseObject(remotingCommand.getBody(), new TypeReference<Map<BrokerIdentityInfo, BrokerLiveInfo>>() { // from class: org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager.1
            }.getType(), new Feature[0]);
        } catch (Throwable th) {
            log.error("get broker live info through raft failed", th);
            return new HashMap();
        }
    }

    /* JADX WARN: Type inference failed for: r1v11, types: [org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager$2] */
    private void scanNotActiveBroker() {
        if (!this.controller.isLeaderState()) {
            log.info("current node is not leader, skip scan not active broker");
            return;
        }
        if (this.firstReceivedHeartbeatTime + this.controllerConfig.getJraftConfig().getjRaftScanWaitTimeoutMs() < System.currentTimeMillis()) {
            log.info("has not received any heartbeat from broker, skip scan not active broker");
            return;
        }
        log.info("start scan not active broker");
        try {
            RemotingCommand remotingCommand = this.controller.checkNotActiveBroker(new CheckNotActiveBrokerRequest()).get(5L, TimeUnit.SECONDS);
            if (remotingCommand.getCode() != 0) {
                throw new RuntimeException("check not active broker return invalid code, code: " + remotingCommand.getCode());
            }
            List list = (List) JSON.parseObject(remotingCommand.getBody(), new TypeReference<List<BrokerIdentityInfo>>() { // from class: org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager.2
            }.getType(), new Feature[0]);
            if (list != null && !list.isEmpty()) {
                list.forEach(brokerIdentityInfo -> {
                    Iterator<Map.Entry<Channel, BrokerIdentityInfo>> it = this.brokerChannelIdentityInfoMap.entrySet().iterator();
                    Channel channel = null;
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry<Channel, BrokerIdentityInfo> next = it.next();
                        if (next.getValue().getBrokerId() != null && next.getValue().equals(brokerIdentityInfo)) {
                            channel = next.getKey();
                            RemotingHelper.closeChannel(next.getKey());
                            it.remove();
                            break;
                        }
                    }
                    this.executor.submit(() -> {
                        notifyBrokerInActive(brokerIdentityInfo.getClusterName(), brokerIdentityInfo.getBrokerName(), brokerIdentityInfo.getBrokerId());
                    });
                    log.warn("The broker channel {} expired, brokerInfo {}", channel, brokerIdentityInfo);
                });
            }
        } catch (Throwable th) {
            log.error("check not active broker through raft failed", th);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public BrokerLiveInfo getBrokerLiveInfo(String str, String str2, Long l) {
        log.info("get broker live info, clusterName: {}, brokerName: {}, brokerId: {}", new Object[]{str, str2, l});
        BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(str, str2, l);
        return getBrokerLiveInfo(brokerIdentityInfo).get(brokerIdentityInfo);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public boolean isBrokerActive(String str, String str2, Long l) {
        try {
            BrokerLiveInfo brokerLiveInfo = getBrokerLiveInfo(str, str2, l);
            return brokerLiveInfo != null && brokerLiveInfo.getLastUpdateTimestamp() + brokerLiveInfo.getHeartbeatTimeoutMillis() >= System.currentTimeMillis();
        } catch (RuntimeException e) {
            log.error("get broker live info failed", e);
            return false;
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public Map<String, Map<String, Integer>> getActiveBrokersNum() {
        HashMap hashMap = new HashMap();
        getBrokerLiveInfo(null).keySet().stream().filter(brokerIdentityInfo -> {
            return isBrokerActive(brokerIdentityInfo.getClusterName(), brokerIdentityInfo.getBrokerName(), brokerIdentityInfo.getBrokerId());
        }).forEach(brokerIdentityInfo2 -> {
            hashMap.computeIfAbsent(brokerIdentityInfo2.getClusterName(), str -> {
                return new HashMap();
            });
            ((Map) hashMap.get(brokerIdentityInfo2.getClusterName())).compute(brokerIdentityInfo2.getBrokerName(), (str2, num) -> {
                return Integer.valueOf(num == null ? 0 : num.intValue() + 1);
            });
        });
        return hashMap;
    }

    private void notifyBrokerInActive(String str, String str2, Long l) {
        log.info("Broker {}-{}-{} inactive", new Object[]{str, str2, l});
        Iterator<BrokerLifecycleListener> it = this.brokerLifecycleListeners.iterator();
        while (it.hasNext()) {
            it.next().onBrokerInactive(str, str2, l);
        }
    }
}
