package org.apache.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.Replicator;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.BytesUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.util.LZ4Util;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftNode.class */
public final class RaftNode {
    private static final Logger LOG;
    private final RaftContext context;
    private RaftGroupService raftGroupService;
    private final Node node;
    private final StoreStateMachine stateMachine;
    private final AtomicReference<LeaderInfo> leaderInfo;
    private final AtomicBoolean started;
    private final AtomicInteger busyCounter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftNode$LeaderInfo.class */
    public static class LeaderInfo {
        private static final LeaderInfo NO_LEADER = new LeaderInfo(null, false);
        private final PeerId leaderId;
        private final boolean selfIsLeader;

        public LeaderInfo(PeerId peerId, boolean z) {
            this.leaderId = peerId;
            this.selfIsLeader = z;
        }
    }

    /* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftNode$RaftStateListener.class */
    protected final class RaftStateListener implements Replicator.ReplicatorStateListener {
        private volatile long lastPrintTime = 0;

        public RaftStateListener() {
        }

        public void onCreated(PeerId peerId) {
            RaftNode.LOG.info("The node {} replicator has created", peerId);
        }

        public void onDestroyed(PeerId peerId) {
            RaftNode.LOG.warn("Replicator '{}' is ready to go offline", peerId);
        }

        public void onError(PeerId peerId, Status status) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastPrintTime >= 60000) {
                RaftNode.LOG.warn("Replicator meet error: {}", status);
                this.lastPrintTime = currentTimeMillis;
            }
            if (isWriteBufferOverflow(status)) {
                RaftNode.LOG.info("Increase busy counter due to overflow: [{}]", Integer.valueOf(RaftNode.this.busyCounter.incrementAndGet()));
            }
        }

        public void onBusy(PeerId peerId, Status status) {
            RaftNode.LOG.info("Increase busy counter: [{}]", Integer.valueOf(RaftNode.this.busyCounter.incrementAndGet()));
        }

        private boolean isWriteBufferOverflow(Status status) {
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains("maybe write overflow");
        }

        private boolean isRpcTimeout(Status status) {
            return RaftError.EINTERNAL == status.getRaftError() && status.getErrorMsg() != null && status.getErrorMsg().contains("Invoke timeout");
        }
    }

    public RaftNode(RaftContext raftContext) {
        this.context = raftContext;
        this.stateMachine = new StoreStateMachine(raftContext);
        try {
            this.node = initRaftNode();
            LOG.info("Start raft node: {}", this);
            this.node.addReplicatorStateListener(new RaftStateListener());
            this.leaderInfo = new AtomicReference<>(LeaderInfo.NO_LEADER);
            this.started = new AtomicBoolean(false);
            this.busyCounter = new AtomicInteger();
        } catch (IOException e) {
            throw new BackendException("Failed to init raft node", e);
        }
    }

    protected RaftContext context() {
        return this.context;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Node node() {
        if ($assertionsDisabled || this.node != null) {
            return this.node;
        }
        throw new AssertionError();
    }

    public PeerId nodeId() {
        return this.node.getNodeId().getPeerId();
    }

    public PeerId leaderId() {
        return this.leaderInfo.get().leaderId;
    }

    public boolean selfIsLeader() {
        return this.leaderInfo.get().selfIsLeader;
    }

    public void onLeaderInfoChange(PeerId peerId, boolean z) {
        this.leaderInfo.set(new LeaderInfo(peerId != null ? peerId.copy() : null, z));
    }

    public void shutdown() {
        LOG.info("Shutdown raft node: {}", this);
        this.node.shutdown();
        if (this.raftGroupService != null) {
            this.raftGroupService.shutdown();
            try {
                this.raftGroupService.join();
            } catch (InterruptedException e) {
                throw new RaftException("Interrupted while shutdown raftGroupService");
            }
        }
    }

    public RaftClosure<?> snapshot() {
        RaftClosure<?> raftClosure = new RaftClosure<>();
        try {
            node().snapshot(raftClosure);
            return raftClosure;
        } catch (Throwable th) {
            throw new BackendException("Failed to create snapshot", th);
        }
    }

    public void readIndex(byte[] bArr, ReadIndexClosure readIndexClosure) {
        this.node.readIndex(bArr, readIndexClosure);
    }

    public <T> T submitAndWait(StoreCommand storeCommand, RaftStoreClosure raftStoreClosure) {
        submitCommand(storeCommand, raftStoreClosure);
        try {
            return (T) raftStoreClosure.waitFinished();
        } catch (Throwable th) {
            throw new BackendException("Failed to wait store command %s", th, storeCommand);
        }
    }

    private void submitCommand(StoreCommand storeCommand, RaftStoreClosure raftStoreClosure) {
        LeaderInfo waitLeaderElected = waitLeaderElected(RaftContext.WAIT_LEADER_TIMEOUT);
        if (!waitLeaderElected.selfIsLeader) {
            this.context.rpcForwarder().forwardToLeader(waitLeaderElected.leaderId, storeCommand, raftStoreClosure);
            return;
        }
        waitIfBusy();
        Task task = new Task();
        ByteBuffer asByteBuffer = LZ4Util.compress(storeCommand.data(), RaftContext.BLOCK_SIZE).forReadWritten().asByteBuffer();
        LOG.debug("Submit to raft node '{}', the compressed bytes of command {} is {}", new Object[]{this.node, storeCommand.action(), Integer.valueOf(asByteBuffer.limit())});
        task.setData(asByteBuffer);
        task.setDone(raftStoreClosure);
        this.node.apply(task);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LeaderInfo waitLeaderElected(int i) {
        String group = this.context.group();
        LeaderInfo leaderInfo = this.leaderInfo.get();
        if (leaderInfo.leaderId != null) {
            return leaderInfo;
        }
        LOG.info("Waiting for raft group '{}' leader elected", group);
        long currentTimeMillis = System.currentTimeMillis();
        while (leaderInfo.leaderId == null) {
            try {
                Thread.sleep(5000L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (i > 0 && currentTimeMillis2 >= i) {
                    throw new BackendException("Waiting for raft group '%s' election timeout(%sms)", group, Long.valueOf(currentTimeMillis2));
                }
                leaderInfo = this.leaderInfo.get();
                if (!$assertionsDisabled && leaderInfo == null) {
                    throw new AssertionError();
                }
            } catch (InterruptedException e) {
                throw new BackendException("Interrupted while waiting for raft group '%s' election", group, e);
            }
        }
        LOG.info("Waited for raft group '{}' leader elected successfully", group);
        return leaderInfo;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitRaftLogSynced(int i) {
        String group = this.context.group();
        LOG.info("Waiting for raft group '{}' log synced", group);
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.started.get()) {
            this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { // from class: org.apache.hugegraph.backend.store.raft.RaftNode.1
                public void run(Status status, long j, byte[] bArr) {
                    RaftNode.this.started.set(status.isOk());
                }
            });
            try {
                Thread.sleep(5000L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (i > 0 && currentTimeMillis2 >= i) {
                    throw new BackendException("Waiting for raft group '%s' log synced timeout(%sms)", group, Long.valueOf(currentTimeMillis2));
                }
            } catch (InterruptedException e) {
                throw new BackendException("Interrupted while waiting for raft group '%s' log synced", group, e);
            }
        }
        LOG.info("Waited for raft group '{}' log synced successfully", group);
    }

    private void waitIfBusy() {
        if (this.busyCounter.get() <= 0) {
            return;
        }
        try {
            try {
                Thread.sleep(r0 * (new Random().nextInt(2000) + RaftContext.BUSY_MIN_SLEEP_FACTOR));
                if (this.busyCounter.get() > 0) {
                    synchronized (this) {
                        if (this.busyCounter.get() > 0) {
                            LOG.info("Decrease busy counter: [{}]", Integer.valueOf(this.busyCounter.decrementAndGet()));
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new BackendException("The raft backend store is busy", e);
            }
        } catch (Throwable th) {
            if (this.busyCounter.get() > 0) {
                synchronized (this) {
                    if (this.busyCounter.get() > 0) {
                        LOG.info("Decrease busy counter: [{}]", Integer.valueOf(this.busyCounter.decrementAndGet()));
                    }
                }
            }
            throw th;
        }
    }

    private Node initRaftNode() throws IOException {
        NodeOptions nodeOptions = this.context.nodeOptions();
        nodeOptions.setFsm(this.stateMachine);
        String group = this.context.group();
        PeerId endpoint = this.context.endpoint();
        RpcServer rpcServer = this.context.rpcServer();
        LOG.debug("Start raft node with endpoint '{}', initial conf [{}]", endpoint, nodeOptions.getInitialConf());
        this.raftGroupService = new RaftGroupService(group, endpoint, nodeOptions, rpcServer, true);
        return this.raftGroupService.start(false);
    }

    public String toString() {
        return String.format("[%s-%s]", this.context.group(), nodeId());
    }

    static {
        $assertionsDisabled = !RaftNode.class.desiredAssertionStatus();
        LOG = Log.logger(RaftNode.class);
    }
}
