package org.apache.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftContext.class */
public final class RaftContext {
    private static final Logger LOG;
    public static final int NO_TIMEOUT = -1;
    public static final int POLL_INTERVAL = 5000;
    public static final int WAIT_RAFTLOG_TIMEOUT = 1800000;
    public static final int WAIT_LEADER_TIMEOUT = 600000;
    public static final int BUSY_MIN_SLEEP_FACTOR = 3000;
    public static final int BUSY_MAX_SLEEP_FACTOR = 5000;
    public static final int WAIT_RPC_TIMEOUT = 1800000;
    public static final int LOG_WARN_INTERVAL = 60000;
    public static final int BLOCK_SIZE = 8192;
    public static final int QUEUE_SIZE;
    public static final long KEEP_ALIVE_SECOND = 300;
    private final HugeGraphParams params;
    private final Configuration groupPeers;
    private final RaftBackendStore[] stores;
    private final ExecutorService readIndexExecutor;
    private final ExecutorService snapshotExecutor;
    private final ExecutorService backendExecutor;
    private RpcServer raftRpcServer;
    private PeerId endpoint;
    private RaftNode raftNode;
    private RaftGroupManager raftGroupManager;
    private RpcForwarder rpcForwarder;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RaftContext(HugeGraphParams hugeGraphParams) {
        this.params = hugeGraphParams;
        HugeConfig configuration = hugeGraphParams.configuration();
        String string = config().getString("raft.group_peers");
        E.checkArgument(string != null, "Please ensure config `raft.group_peers` in raft mode", new Object[0]);
        this.groupPeers = new Configuration();
        if (!this.groupPeers.parse(string)) {
            throw new HugeException("Failed to parse raft.group_peers: '%s'", string);
        }
        this.stores = new RaftBackendStore[RaftRequests.StoreType.ALL.getNumber()];
        if (((Boolean) configuration.get(CoreOptions.RAFT_SAFE_READ)).booleanValue()) {
            this.readIndexExecutor = createReadIndexExecutor(((Integer) configuration.get(CoreOptions.RAFT_READ_INDEX_THREADS)).intValue());
        } else {
            this.readIndexExecutor = null;
        }
        this.snapshotExecutor = createSnapshotExecutor(((Integer) configuration.get(CoreOptions.RAFT_SNAPSHOT_THREADS)).intValue());
        this.backendExecutor = createBackendExecutor(((Integer) configuration.get(CoreOptions.RAFT_BACKEND_THREADS)).intValue());
        this.raftRpcServer = null;
        this.endpoint = null;
        this.raftNode = null;
        this.raftGroupManager = null;
        this.rpcForwarder = null;
    }

    public void initRaftNode(com.alipay.remoting.rpc.RpcServer rpcServer) {
        this.raftRpcServer = wrapRpcServer(rpcServer);
        this.endpoint = new PeerId(rpcServer.ip(), rpcServer.port());
        registerRpcRequestProcessors();
        LOG.info("Start raft server successfully: {}", endpoint());
        this.raftNode = new RaftNode(this);
        this.rpcForwarder = new RpcForwarder(this.raftNode.node());
        this.raftGroupManager = new RaftGroupManagerImpl(this);
    }

    public void waitRaftNodeStarted() {
        RaftNode node = node();
        node.waitLeaderElected(WAIT_LEADER_TIMEOUT);
        node.waitRaftLogSynced(-1);
    }

    public void close() {
        LOG.info("Stop raft server: {}", endpoint());
        RaftNode node = node();
        if (node != null) {
            node.shutdown();
        }
        shutdownRpcServer();
    }

    public RaftNode node() {
        return this.raftNode;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RpcServer rpcServer() {
        return this.raftRpcServer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RpcForwarder rpcForwarder() {
        return this.rpcForwarder;
    }

    public RaftGroupManager raftNodeManager() {
        return this.raftGroupManager;
    }

    public String group() {
        return this.params.name();
    }

    public void addStore(RaftRequests.StoreType storeType, RaftBackendStore raftBackendStore) {
        this.stores[storeType.getNumber()] = raftBackendStore;
    }

    public RaftRequests.StoreType storeType(String str) {
        if (BackendStoreProvider.SCHEMA_STORE.equals(str)) {
            return RaftRequests.StoreType.SCHEMA;
        }
        if (BackendStoreProvider.GRAPH_STORE.equals(str)) {
            return RaftRequests.StoreType.GRAPH;
        }
        if ($assertionsDisabled || BackendStoreProvider.SYSTEM_STORE.equals(str)) {
            return RaftRequests.StoreType.SYSTEM;
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RaftBackendStore[] stores() {
        return this.stores;
    }

    public BackendStore originStore(RaftRequests.StoreType storeType) {
        RaftBackendStore raftBackendStore = this.stores[storeType.getNumber()];
        E.checkState(raftBackendStore != null, "The raft store of type %s shouldn't be null", new Object[]{storeType});
        return raftBackendStore.originStore();
    }

    public NodeOptions nodeOptions() throws IOException {
        HugeConfig config = config();
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setEnableMetrics(false);
        nodeOptions.setRpcProcessorThreadPoolSize(((Integer) config.get(CoreOptions.RAFT_RPC_THREADS)).intValue());
        nodeOptions.setRpcConnectTimeoutMs(((Integer) config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)).intValue());
        nodeOptions.setRpcDefaultTimeout(1000 * ((Integer) config.get(CoreOptions.RAFT_RPC_TIMEOUT)).intValue());
        nodeOptions.setRpcInstallSnapshotTimeout(1000 * ((Integer) config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT)).intValue());
        nodeOptions.setElectionTimeoutMs(((Integer) config.get(CoreOptions.RAFT_ELECTION_TIMEOUT)).intValue());
        nodeOptions.setDisableCli(false);
        nodeOptions.setSnapshotIntervalSecs(((Integer) config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL)).intValue());
        nodeOptions.setInitialConf(this.groupPeers);
        String str = (String) config.get(CoreOptions.RAFT_PATH);
        String path = Paths.get(str, "log").toString();
        FileUtils.forceMkdir(new File(path));
        nodeOptions.setLogUri(path);
        String path2 = Paths.get(str, "meta").toString();
        FileUtils.forceMkdir(new File(path2));
        nodeOptions.setRaftMetaUri(path2);
        String path3 = Paths.get(str, StoreSnapshotFile.SNAPSHOT_DIR).toString();
        FileUtils.forceMkdir(new File(path3));
        nodeOptions.setSnapshotUri(path3);
        RaftOptions raftOptions = nodeOptions.getRaftOptions();
        raftOptions.setApplyBatch(((Integer) config.get(CoreOptions.RAFT_APPLY_BATCH)).intValue());
        raftOptions.setDisruptorBufferSize(((Integer) config.get(CoreOptions.RAFT_QUEUE_SIZE)).intValue());
        raftOptions.setDisruptorPublishEventWaitTimeoutSecs(((Integer) config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT)).intValue());
        raftOptions.setReplicatorPipeline(((Boolean) config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE)).booleanValue());
        raftOptions.setOpenStatistics(false);
        raftOptions.setReadOnlyOptions(ReadOnlyOption.valueOf((String) config.get(CoreOptions.RAFT_READ_STRATEGY)));
        return nodeOptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearCache() {
        notifyCache("clear", HugeType.VERTEX_LABEL, null);
        notifyCache("clear", HugeType.VERTEX, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateCacheIfNeeded(BackendMutation backendMutation, boolean z) {
        if (graphMode() != GraphMode.NONE) {
            return;
        }
        if (z || !node().selfIsLeader()) {
            for (HugeType hugeType : backendMutation.types()) {
                ArrayList arrayList = new ArrayList(500);
                if (hugeType.isSchema() || hugeType.isGraph()) {
                    Iterator<BackendAction> mutation = backendMutation.mutation(hugeType);
                    while (mutation.hasNext()) {
                        arrayList.add(mutation.next().entry().originId());
                    }
                    notifyCache(Cache.ACTION_INVALID, hugeType, arrayList);
                }
            }
        }
    }

    protected void notifyCache(String str, HugeType hugeType, List<Id> list) {
        EventHub schemaEventHub;
        if (hugeType.isGraph()) {
            schemaEventHub = this.params.graphEventHub();
        } else if (!hugeType.isSchema()) {
            return;
        } else {
            schemaEventHub = this.params.schemaEventHub();
        }
        try {
            if (list == null) {
                schemaEventHub.call(Events.CACHE, new Object[]{str, hugeType});
            } else if (list.size() == 1) {
                schemaEventHub.call(Events.CACHE, new Object[]{str, hugeType, list.get(0)});
            } else {
                schemaEventHub.call(Events.CACHE, new Object[]{str, hugeType, list.toArray()});
            }
        } catch (RejectedExecutionException e) {
            LOG.warn("Can't update cache due to EventHub is too busy");
        }
    }

    public PeerId endpoint() {
        return this.endpoint;
    }

    public boolean safeRead() {
        return ((Boolean) config().get(CoreOptions.RAFT_SAFE_READ)).booleanValue();
    }

    public ExecutorService snapshotExecutor() {
        return this.snapshotExecutor;
    }

    public ExecutorService backendExecutor() {
        return this.backendExecutor;
    }

    public ExecutorService readIndexExecutor() {
        return this.readIndexExecutor;
    }

    public GraphMode graphMode() {
        return this.params.mode();
    }

    private HugeConfig config() {
        return this.params.configuration();
    }

    private RpcServer initAndStartRpcServer() {
        System.setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf((Integer) config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK)));
        System.setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf((Integer) config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK)));
        PeerId endpoint = endpoint();
        NodeManager.getInstance().addAddress(endpoint.getEndpoint());
        RpcServer createAndStartRaftRpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(endpoint.getEndpoint());
        LOG.info("Raft-RPC server is started successfully");
        return createAndStartRaftRpcServer;
    }

    private RpcServer wrapRpcServer(com.alipay.remoting.rpc.RpcServer rpcServer) {
        System.setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf((Integer) config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK)));
        System.setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf((Integer) config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK)));
        BoltRpcServer boltRpcServer = new BoltRpcServer(rpcServer);
        RaftRpcServerFactory.addRaftRequestProcessors(boltRpcServer);
        return boltRpcServer;
    }

    private void shutdownRpcServer() {
        this.raftRpcServer.shutdown();
        NodeManager.getInstance().removeAddress(endpoint().getEndpoint());
    }

    private void registerRpcRequestProcessors() {
        this.raftRpcServer.registerProcessor(new StoreCommandProcessor(this));
        this.raftRpcServer.registerProcessor(new SetLeaderProcessor(this));
        this.raftRpcServer.registerProcessor(new ListPeersProcessor(this));
    }

    private ExecutorService createReadIndexExecutor(int i) {
        return newPool(i, i << 2, "store-read-index-callback", new ThreadPoolExecutor.AbortPolicy());
    }

    private ExecutorService createSnapshotExecutor(int i) {
        return newPool(i, i << 2, "store-snapshot-executor", new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private ExecutorService createBackendExecutor(int i) {
        return newPool(i, i, "store-backend-executor", new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static ExecutorService newPool(int i, int i2, String str, RejectedExecutionHandler rejectedExecutionHandler) {
        return ThreadPoolUtil.newBuilder().poolName(str).enableMetric(false).coreThreads(Integer.valueOf(i)).maximumThreads(Integer.valueOf(i2)).keepAliveSeconds(300L).workQueue(new ArrayBlockingQueue(QUEUE_SIZE)).threadFactory(new NamedThreadFactory(str, true)).rejectedHandler(rejectedExecutionHandler).build();
    }

    static {
        $assertionsDisabled = !RaftContext.class.desiredAssertionStatus();
        LOG = Log.logger(RaftContext.class);
        QUEUE_SIZE = CoreOptions.CPUS;
    }
}
