package org.apache.rocketmq.controller;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.JRaftController;
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;
import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.Configuration;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;

/* loaded from: input_file:org/apache/rocketmq/controller/ControllerManager.class */
public class ControllerManager {
    private static final Logger log;
    private final ControllerConfig controllerConfig;
    private final NettyServerConfig nettyServerConfig;
    private final NettyClientConfig nettyClientConfig;
    private final BrokerHousekeepingService brokerHousekeepingService = new BrokerHousekeepingService(this);
    private final Configuration configuration;
    private final RemotingClient remotingClient;
    private Controller controller;
    private final BrokerHeartbeatManager heartbeatManager;
    private ExecutorService controllerRequestExecutor;
    private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
    private final NotifyService notifyService;
    private ControllerMetricsManager controllerMetricsManager;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/rocketmq/controller/ControllerManager$NotifyService.class */
    class NotifyService {
        private ExecutorService executorService;
        private Map<String, NotifyTask> currentNotifyFutures;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/rocketmq/controller/ControllerManager$NotifyService$NotifyTask.class */
        public class NotifyTask extends Pair<Integer, Future> {
            public NotifyTask(Integer num, Future future) {
                super(num, future);
            }

            public Integer getMasterEpoch() {
                return (Integer) super.getObject1();
            }

            public Future getFuture() {
                return (Future) super.getObject2();
            }

            public void setFuture(Future future) {
                super.setObject2(future);
            }

            public int hashCode() {
                return Objects.hashCode(super.getObject1());
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj instanceof NotifyTask) {
                    return ((Integer) super.getObject1()).equals(((NotifyTask) obj).getObject1());
                }
                return false;
            }
        }

        public NotifyService() {
        }

        public void initialize() {
            this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_"));
            this.currentNotifyFutures = new ConcurrentHashMap();
        }

        public void notifyBroker(String str, RoleChangeNotifyEntry roleChangeNotifyEntry) {
            Future future;
            int masterEpoch = roleChangeNotifyEntry.getMasterEpoch();
            NotifyTask notifyTask = this.currentNotifyFutures.get(str);
            if (notifyTask != null && masterEpoch > notifyTask.getMasterEpoch().intValue() && (future = notifyTask.getFuture()) != null && !future.isDone()) {
                future.cancel(true);
            }
            NotifyTask notifyTask2 = new NotifyTask(Integer.valueOf(masterEpoch), null);
            Runnable runnable = () -> {
                ControllerManager.this.doNotifyBrokerRoleChanged(str, roleChangeNotifyEntry);
                this.currentNotifyFutures.remove(str, notifyTask2);
            };
            this.currentNotifyFutures.put(str, notifyTask2);
            notifyTask2.setFuture(this.executorService.submit(runnable));
        }

        public void shutdown() {
            if (this.executorService.isShutdown()) {
                return;
            }
            this.executorService.shutdownNow();
        }
    }

    public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
        this.controllerConfig = controllerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.configuration = new Configuration(log, new Object[]{this.controllerConfig, this.nettyServerConfig});
        this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
        this.heartbeatManager = BrokerHeartbeatManager.newBrokerHeartbeatManager(controllerConfig);
        this.notifyService = new NotifyService();
    }

    public boolean initialize() {
        this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
        this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.controllerConfig.getControllerThreadPoolNums(), this.controllerConfig.getControllerThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.controllerRequestThreadPoolQueue, new ThreadFactoryImpl("ControllerRequestExecutorThread_"));
        this.notifyService.initialize();
        if (this.controllerConfig.getControllerType().equals("jRaft")) {
            if (StringUtils.isEmpty(this.controllerConfig.getJraftConfig().getjRaftInitConf())) {
                throw new IllegalArgumentException("Attribute value jRaftInitConf of ControllerConfig is null or empty");
            }
            if (StringUtils.isEmpty(this.controllerConfig.getJraftConfig().getjRaftServerId())) {
                throw new IllegalArgumentException("Attribute value jRaftServerId of ControllerConfig is null or empty");
            }
            try {
                this.controller = new JRaftController(this.controllerConfig, this.brokerHousekeepingService);
                ((RaftBrokerHeartBeatManager) this.heartbeatManager).setController((JRaftController) this.controller);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } else {
            if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
                throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
            }
            if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerSelfId())) {
                throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");
            }
            ControllerConfig controllerConfig = this.controllerConfig;
            BrokerHeartbeatManager brokerHeartbeatManager = this.heartbeatManager;
            brokerHeartbeatManager.getClass();
            BrokerValidPredicate brokerValidPredicate = brokerHeartbeatManager::isBrokerActive;
            NettyServerConfig nettyServerConfig = this.nettyServerConfig;
            NettyClientConfig nettyClientConfig = this.nettyClientConfig;
            BrokerHousekeepingService brokerHousekeepingService = this.brokerHousekeepingService;
            BrokerHeartbeatManager brokerHeartbeatManager2 = this.heartbeatManager;
            brokerHeartbeatManager2.getClass();
            BrokerValidPredicate brokerValidPredicate2 = brokerHeartbeatManager2::isBrokerActive;
            BrokerHeartbeatManager brokerHeartbeatManager3 = this.heartbeatManager;
            brokerHeartbeatManager3.getClass();
            this.controller = new DLedgerController(controllerConfig, brokerValidPredicate, nettyServerConfig, nettyClientConfig, brokerHousekeepingService, new DefaultElectPolicy(brokerValidPredicate2, brokerHeartbeatManager3::getBrokerLiveInfo));
        }
        this.heartbeatManager.initialize();
        this.heartbeatManager.registerBrokerLifecycleListener(this::onBrokerInactive);
        this.controller.registerBrokerLifecycleListener(this::onBrokerInactive);
        registerProcessor();
        this.controllerMetricsManager = ControllerMetricsManager.getInstance(this);
        return true;
    }

    private void onBrokerInactive(String str, String str2, Long l) {
        log.info("Controller Manager received broker inactive event, clusterName: {}, brokerName: {}, brokerId: {}", new Object[]{str, str2, l});
        if (!this.controller.isLeaderState()) {
            log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", l, str2);
        } else if (l == null) {
            triggerElectMaster(str2);
        } else {
            this.controller.getReplicaInfo(new GetReplicaInfoRequestHeader(str2)).whenCompleteAsync((remotingCommand, th) -> {
                if (th != null || remotingCommand == null) {
                    log.error("Failed to get replica-info for broker-set: {} when OnBrokerInactive", str2, th);
                } else if (l.equals(remotingCommand.readCustomHeader().getMasterBrokerId())) {
                    triggerElectMaster(str2);
                } else {
                    log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", l, str2);
                }
            });
        }
    }

    private CompletableFuture<Boolean> triggerElectMaster0(String str) {
        return this.controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(str)).handleAsync((remotingCommand, th) -> {
            if (th != null || remotingCommand == null || remotingCommand.getCode() != 0) {
                log.error("Failed to trigger elect-master in broker-set: {}", str, th);
                return false;
            }
            if (remotingCommand.getCode() != 0) {
                return false;
            }
            log.info("Elect a new master in broker-set: {} done, result: {}", str, remotingCommand);
            if (this.controllerConfig.isNotifyBrokerRoleChanged()) {
                notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(remotingCommand));
            }
            return true;
        });
    }

    private void triggerElectMaster(String str) {
        int electMasterMaxRetryCount = this.controllerConfig.getElectMasterMaxRetryCount();
        for (int i = 0; i < electMasterMaxRetryCount; i++) {
            try {
            } catch (Exception e) {
                log.warn("Failed to trigger elect-master in broker-set: {}, retryCount: {}", new Object[]{str, Integer.valueOf(i), e});
            }
            if (triggerElectMaster0(str).get(3L, TimeUnit.SECONDS).booleanValue()) {
                return;
            }
        }
    }

    public void notifyBrokerRoleChanged(RoleChangeNotifyEntry roleChangeNotifyEntry) {
        BrokerMemberGroup brokerMemberGroup = roleChangeNotifyEntry.getBrokerMemberGroup();
        if (brokerMemberGroup != null) {
            Long masterBrokerId = roleChangeNotifyEntry.getMasterBrokerId();
            String cluster = brokerMemberGroup.getCluster();
            String brokerName = brokerMemberGroup.getBrokerName();
            if (masterBrokerId == null) {
                log.warn("Notify broker role change failed, because member group is not null but the new master brokerId is empty, entry:{}", roleChangeNotifyEntry);
            } else {
                brokerMemberGroup.getBrokerAddrs().entrySet().stream().filter(entry -> {
                    return this.heartbeatManager.isBrokerActive(cluster, brokerName, (Long) entry.getKey());
                }).forEach(entry2 -> {
                    this.notifyService.notifyBroker((String) entry2.getValue(), roleChangeNotifyEntry);
                });
            }
        }
    }

    public void doNotifyBrokerRoleChanged(String str, RoleChangeNotifyEntry roleChangeNotifyEntry) {
        if (StringUtils.isNoneEmpty(new CharSequence[]{str})) {
            log.info("Try notify broker {} that role changed, RoleChangeNotifyEntry:{}", str, roleChangeNotifyEntry);
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(1008, new NotifyBrokerRoleChangedRequestHeader(roleChangeNotifyEntry.getMasterAddress(), roleChangeNotifyEntry.getMasterBrokerId(), Integer.valueOf(roleChangeNotifyEntry.getMasterEpoch()), Integer.valueOf(roleChangeNotifyEntry.getSyncStateSetEpoch())));
            createRequestCommand.setBody(new SyncStateSet(roleChangeNotifyEntry.getSyncStateSet(), roleChangeNotifyEntry.getSyncStateSetEpoch()).encode());
            try {
                this.remotingClient.invokeOneway(str, createRequestCommand, 3000L);
            } catch (Exception e) {
                log.error("Failed to notify broker {} that role changed", str, e);
            }
        }
    }

    public void registerProcessor() {
        ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
        RemotingServer remotingServer = this.controller.getRemotingServer();
        if (!$assertionsDisabled && remotingServer == null) {
            throw new AssertionError();
        }
        remotingServer.registerProcessor(1001, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1002, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1003, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1004, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1005, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1006, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(904, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1009, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1010, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1011, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1012, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1013, controllerRequestProcessor, this.controllerRequestExecutor);
    }

    public void start() {
        this.controller.startup();
        this.heartbeatManager.start();
        this.remotingClient.start();
    }

    public void shutdown() {
        this.heartbeatManager.shutdown();
        this.controllerRequestExecutor.shutdown();
        this.notifyService.shutdown();
        this.controller.shutdown();
        this.remotingClient.shutdown();
    }

    public BrokerHeartbeatManager getHeartbeatManager() {
        return this.heartbeatManager;
    }

    public ControllerConfig getControllerConfig() {
        return this.controllerConfig;
    }

    public Controller getController() {
        return this.controller;
    }

    public NettyServerConfig getNettyServerConfig() {
        return this.nettyServerConfig;
    }

    public NettyClientConfig getNettyClientConfig() {
        return this.nettyClientConfig;
    }

    public BrokerHousekeepingService getBrokerHousekeepingService() {
        return this.brokerHousekeepingService;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    static {
        $assertionsDisabled = !ControllerManager.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger("RocketmqController");
    }
}
