package org.apache.rocketmq.controller.impl;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Utils;
import io.opentelemetry.api.common.AttributesBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
import org.apache.rocketmq.controller.impl.closure.ControllerClosure;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.manager.RaftReplicasInfoManager;
import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
import org.apache.rocketmq.controller.impl.task.GetSyncStateDataRequest;
import org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/JRaftControllerStateMachine.class */
public class JRaftControllerStateMachine implements StateMachine {
    private static final Logger log = LoggerFactory.getLogger("RocketmqController");
    private final List<Consumer<Long>> onLeaderStartCallbacks = new ArrayList();
    private final List<Consumer<Status>> onLeaderStopCallbacks = new ArrayList();
    private final RaftReplicasInfoManager replicasInfoManager;
    private final NodeId nodeId;

    public JRaftControllerStateMachine(ControllerConfig controllerConfig, NodeId nodeId) {
        this.replicasInfoManager = new RaftReplicasInfoManager(controllerConfig);
        this.nodeId = nodeId;
    }

    public void onApply(Iterator iterator) {
        while (iterator.hasNext()) {
            processEvent((ControllerClosure) iterator.done(), iterator.getData().array(), iterator.getTerm(), iterator.getIndex());
            iterator.next();
        }
    }

    private void processEvent(ControllerClosure controllerClosure, byte[] bArr, long j, long j2) {
        ControllerResult<AlterSyncStateSetResponseHeader> checkNotActiveBroker;
        try {
            RemotingCommand requestEvent = controllerClosure != null ? controllerClosure.getRequestEvent() : RemotingCommand.decode(Arrays.copyOfRange(bArr, 4, bArr.length));
            log.info("process event: term {}, index {}, request code {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(requestEvent.getCode())});
            switch (requestEvent.getCode()) {
                case 1001:
                    checkNotActiveBroker = alterSyncStateSet((AlterSyncStateSetRequestHeader) requestEvent.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class), (SyncStateSet) RemotingSerializable.decode(requestEvent.getBody(), SyncStateSet.class));
                    break;
                case 1002:
                    checkNotActiveBroker = electMaster((ElectMasterRequestHeader) requestEvent.decodeCommandCustomHeader(ElectMasterRequestHeader.class));
                    break;
                case 1003:
                    checkNotActiveBroker = registerBroker((RegisterBrokerToControllerRequestHeader) requestEvent.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class));
                    break;
                case 1004:
                    checkNotActiveBroker = getReplicaInfo((GetReplicaInfoRequestHeader) requestEvent.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class));
                    break;
                case 1005:
                case 1007:
                case 1008:
                case 1009:
                case 1010:
                case 1017:
                default:
                    throw new RemotingCommandException("Unknown request code: " + requestEvent.getCode());
                case 1006:
                    checkNotActiveBroker = getSyncStateData((List) RemotingSerializable.decode(requestEvent.getBody(), List.class), ((GetSyncStateDataRequest) requestEvent.decodeCommandCustomHeader(GetSyncStateDataRequest.class)).getInvokeTime().longValue());
                    break;
                case 1011:
                    checkNotActiveBroker = cleanBrokerData((CleanControllerBrokerDataRequestHeader) requestEvent.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class));
                    break;
                case 1012:
                    checkNotActiveBroker = getNextBrokerId((GetNextBrokerIdRequestHeader) requestEvent.decodeCommandCustomHeader(GetNextBrokerIdRequestHeader.class));
                    break;
                case 1013:
                    checkNotActiveBroker = applyBrokerId((ApplyBrokerIdRequestHeader) requestEvent.decodeCommandCustomHeader(ApplyBrokerIdRequestHeader.class));
                    break;
                case 1014:
                    checkNotActiveBroker = this.replicasInfoManager.onBrokerCloseChannel((BrokerCloseChannelRequest) requestEvent.decodeCommandCustomHeader(BrokerCloseChannelRequest.class));
                    break;
                case 1015:
                    checkNotActiveBroker = this.replicasInfoManager.checkNotActiveBroker((CheckNotActiveBrokerRequest) requestEvent.decodeCommandCustomHeader(CheckNotActiveBrokerRequest.class));
                    break;
                case 1016:
                    checkNotActiveBroker = this.replicasInfoManager.getBrokerLiveInfo((GetBrokerLiveInfoRequest) requestEvent.decodeCommandCustomHeader(GetBrokerLiveInfoRequest.class));
                    break;
                case 1018:
                    checkNotActiveBroker = this.replicasInfoManager.onBrokerHeartBeat((RaftBrokerHeartBeatEventRequest) requestEvent.decodeCommandCustomHeader(RaftBrokerHeartBeatEventRequest.class));
                    break;
            }
            List<EventMessage> events = checkNotActiveBroker.getEvents();
            RaftReplicasInfoManager raftReplicasInfoManager = this.replicasInfoManager;
            raftReplicasInfoManager.getClass();
            events.forEach(raftReplicasInfoManager::applyEvent);
            log.info("process event: term {}, index {}, request code {} success with result {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(requestEvent.getCode()), checkNotActiveBroker.toString()});
            if (controllerClosure != null) {
                controllerClosure.setControllerResult(checkNotActiveBroker);
                controllerClosure.run(Status.OK());
            }
        } catch (RemotingCommandException e) {
            log.error("Fail to process event", e);
            if (controllerClosure != null) {
                controllerClosure.run(new Status(RaftError.EINTERNAL, e.getMessage(), new Object[0]));
            }
        }
    }

    private ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(AlterSyncStateSetRequestHeader alterSyncStateSetRequestHeader, SyncStateSet syncStateSet) {
        return this.replicasInfoManager.alterSyncStateSet(alterSyncStateSetRequestHeader, syncStateSet, new RaftReplicasInfoManager.BrokerValidPredicateWithInvokeTime(alterSyncStateSetRequestHeader.getInvokeTime(), this.replicasInfoManager));
    }

    private ControllerResult<ElectMasterResponseHeader> electMaster(ElectMasterRequestHeader electMasterRequestHeader) {
        RaftReplicasInfoManager raftReplicasInfoManager = this.replicasInfoManager;
        BrokerValidPredicate brokerValidPredicate = (str, str2, l) -> {
            return this.replicasInfoManager.isBrokerActive(str, str2, l, electMasterRequestHeader.getInvokeTime().longValue());
        };
        RaftReplicasInfoManager raftReplicasInfoManager2 = this.replicasInfoManager;
        raftReplicasInfoManager2.getClass();
        ControllerResult<ElectMasterResponseHeader> electMaster = raftReplicasInfoManager.electMaster(electMasterRequestHeader, new DefaultElectPolicy(brokerValidPredicate, raftReplicasInfoManager2::getBrokerLiveInfo));
        log.info("elect master, request :{}, result: {}", electMasterRequestHeader.toString(), electMaster.toString());
        AttributesBuilder put = ControllerMetricsManager.newAttributesBuilder().put(ControllerMetricsConstant.LABEL_CLUSTER_NAME, electMasterRequestHeader.getClusterName()).put(ControllerMetricsConstant.LABEL_BROKER_SET, electMasterRequestHeader.getBrokerName());
        switch (electMaster.getResponseCode()) {
            case 0:
                ControllerMetricsManager.electionTotal.add(1L, put.put(ControllerMetricsConstant.LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NEW_MASTER_ELECTED.getLowerCaseName()).build());
                break;
            case 2004:
            case 2012:
                ControllerMetricsManager.electionTotal.add(1L, put.put(ControllerMetricsConstant.LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NO_MASTER_ELECTED.getLowerCaseName()).build());
                break;
            case 2011:
                ControllerMetricsManager.electionTotal.add(1L, put.put(ControllerMetricsConstant.LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.KEEP_CURRENT_MASTER.getLowerCaseName()).build());
                break;
        }
        return electMaster;
    }

    private ControllerResult<GetNextBrokerIdResponseHeader> getNextBrokerId(GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader) {
        return this.replicasInfoManager.getNextBrokerId(getNextBrokerIdRequestHeader);
    }

    private ControllerResult<ApplyBrokerIdResponseHeader> applyBrokerId(ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader) {
        return this.replicasInfoManager.applyBrokerId(applyBrokerIdRequestHeader);
    }

    private ControllerResult<?> registerBroker(RegisterBrokerToControllerRequestHeader registerBrokerToControllerRequestHeader) {
        return this.replicasInfoManager.registerBroker(registerBrokerToControllerRequestHeader, new RaftReplicasInfoManager.BrokerValidPredicateWithInvokeTime(registerBrokerToControllerRequestHeader.getInvokeTime(), this.replicasInfoManager));
    }

    private ControllerResult<GetReplicaInfoResponseHeader> getReplicaInfo(GetReplicaInfoRequestHeader getReplicaInfoRequestHeader) {
        return this.replicasInfoManager.getReplicaInfo(getReplicaInfoRequestHeader);
    }

    private ControllerResult<Void> getSyncStateData(List<String> list, long j) {
        return this.replicasInfoManager.getSyncStateData(list, new RaftReplicasInfoManager.BrokerValidPredicateWithInvokeTime(j, this.replicasInfoManager));
    }

    private ControllerResult<Void> cleanBrokerData(CleanControllerBrokerDataRequestHeader cleanControllerBrokerDataRequestHeader) {
        return this.replicasInfoManager.cleanBrokerData(cleanControllerBrokerDataRequestHeader, new RaftReplicasInfoManager.BrokerValidPredicateWithInvokeTime(cleanControllerBrokerDataRequestHeader.getInvokeTime(), this.replicasInfoManager));
    }

    public void onShutdown() {
        log.info("StateMachine {} node {} onShutdown", getClass().getName(), this.nodeId.toString());
    }

    public void onSnapshotSave(SnapshotWriter snapshotWriter, Closure closure) {
        try {
            byte[] serialize = this.replicasInfoManager.serialize();
            Utils.runInThread(() -> {
                try {
                    FileUtils.writeByteArrayToFile(new File(snapshotWriter.getPath() + File.separator + "data"), serialize);
                    if (!snapshotWriter.addFile("data")) {
                        throw new IOException("Fail to add file to writer");
                    }
                    log.info("Save snapshot, path={}", snapshotWriter.getPath());
                    closure.run(Status.OK());
                } catch (IOException e) {
                    log.error("Fail to save snapshot", e);
                    closure.run(new Status(RaftError.EIO, "Fail to save snapshot", new Object[0]));
                }
            });
        } catch (Throwable th) {
            closure.run(new Status(RaftError.EIO, "Fail to serialize replicasInfoManager state machine data", new Object[0]));
        }
    }

    public boolean onSnapshotLoad(SnapshotReader snapshotReader) {
        if (snapshotReader.getFileMeta("data") == null) {
            log.error("Fail to find data file in {}", snapshotReader.getPath());
            return false;
        }
        try {
            this.replicasInfoManager.deserializeFrom(FileUtils.readFileToByteArray(new File(snapshotReader.getPath() + File.separator + "data")));
            log.info("Load snapshot from {}", snapshotReader.getPath());
            return true;
        } catch (Throwable th) {
            log.error("Fail to load snapshot from {}", snapshotReader.getPath(), th);
            return false;
        }
    }

    public void onLeaderStart(long j) {
        java.util.Iterator<Consumer<Long>> it = this.onLeaderStartCallbacks.iterator();
        while (it.hasNext()) {
            it.next().accept(Long.valueOf(j));
        }
        log.info("node {} Start Leader, term={}", this.nodeId.toString(), Long.valueOf(j));
    }

    public void onLeaderStop(Status status) {
        java.util.Iterator<Consumer<Status>> it = this.onLeaderStopCallbacks.iterator();
        while (it.hasNext()) {
            it.next().accept(status);
        }
        log.info("node {} Stop Leader, status={}", this.nodeId.toString(), status);
    }

    public void registerOnLeaderStart(Consumer<Long> consumer) {
        this.onLeaderStartCallbacks.add(consumer);
    }

    public void registerOnLeaderStop(Consumer<Status> consumer) {
        this.onLeaderStopCallbacks.add(consumer);
    }

    public void onError(RaftException raftException) {
        log.error("Encountered an error={} on StateMachine {}, node {}, raft may stop working since some error occurs, you should figure out the cause and repair or remove this node.", new Object[]{raftException.getStatus(), getClass().getName(), this.nodeId.toString(), raftException});
    }

    public void onConfigurationCommitted(Configuration configuration) {
        log.info("Configuration committed, conf={}", configuration);
    }

    public void onStopFollowing(LeaderChangeContext leaderChangeContext) {
        log.info("Stop following, ctx={}", leaderChangeContext);
    }

    public void onStartFollowing(LeaderChangeContext leaderChangeContext) {
        log.info("Start following, ctx={}", leaderChangeContext);
    }
}
