package org.apache.hugegraph.backend.store.raft;

import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.util.BytesUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.SystemSchemaStore;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftBackendStore.class */
public class RaftBackendStore implements BackendStore {
    private static final Logger LOG;
    private final BackendStore store;
    private final RaftContext context;
    private final ThreadLocal<MutationBatch> mutationBatch = new ThreadLocal<>();
    private final boolean isSafeRead;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftBackendStore$IncrCounter.class */
    public static final class IncrCounter {
        private HugeType type;
        private long increment;

        public IncrCounter(HugeType hugeType, long j) {
            this.type = hugeType;
            this.increment = j;
        }

        public HugeType type() {
            return this.type;
        }

        public long increment() {
            return this.increment;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hugegraph/backend/store/raft/RaftBackendStore$MutationBatch.class */
    public static class MutationBatch {
        private final List<BackendMutation> mutations = new ArrayList(500);

        public void add(BackendMutation backendMutation) {
            this.mutations.add(backendMutation);
        }

        public void clear() {
            this.mutations.clear();
        }
    }

    public RaftBackendStore(BackendStore backendStore, RaftContext raftContext) {
        this.store = backendStore;
        this.context = raftContext;
        this.isSafeRead = this.context.safeRead();
    }

    public BackendStore originStore() {
        return this.store;
    }

    private RaftNode node() {
        RaftNode node = this.context.node();
        E.checkState(node != null, "The raft node should be initialized first", new Object[0]);
        return node;
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public String store() {
        return this.store.store();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public String storedVersion() {
        return this.store.storedVersion();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public String database() {
        return this.store.database();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public BackendStoreProvider provider() {
        return this.store.provider();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public SystemSchemaStore systemSchemaStore() {
        return this.store.systemSchemaStore();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public boolean isSchemaStore() {
        return this.store.isSchemaStore();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public synchronized void open(HugeConfig hugeConfig) {
        this.store.open(hugeConfig);
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void close() {
        this.store.close();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public boolean opened() {
        return this.store.opened();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void init() {
        this.store.init();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void clear(boolean z) {
        submitAndWait(RaftRequests.StoreAction.CLEAR, StoreCommand.wrap(z ? (byte) 1 : (byte) 0));
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public boolean initialized() {
        return this.store.initialized();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void truncate() {
        submitAndWait(RaftRequests.StoreAction.TRUNCATE, null);
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void mutate(BackendMutation backendMutation) {
        if (backendMutation.isEmpty()) {
            return;
        }
        getOrNewBatch().add(backendMutation);
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public Iterator<BackendEntry> query(Query query) {
        return (Iterator) queryByRaft(query, obj -> {
            return this.store.query(query);
        });
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public Number queryNumber(Query query) {
        return (Number) queryByRaft(query, obj -> {
            return this.store.queryNumber(query);
        });
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void beginTx() {
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void commitTx() {
        MutationBatch orNewBatch = getOrNewBatch();
        try {
            submitAndWait(RaftRequests.StoreAction.COMMIT_TX, StoreSerializer.writeMutations(orNewBatch.mutations));
        } finally {
            orNewBatch.clear();
        }
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void rollbackTx() {
        submitAndWait(RaftRequests.StoreAction.ROLLBACK_TX, null);
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public <R> R metadata(HugeType hugeType, String str, Object[] objArr) {
        return (R) this.store.metadata(hugeType, str, objArr);
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public BackendFeatures features() {
        return this.store.features();
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public void increaseCounter(HugeType hugeType, long j) {
        submitAndWait(RaftRequests.StoreAction.INCR_COUNTER, StoreSerializer.writeIncrCounter(new IncrCounter(hugeType, j)));
    }

    @Override // org.apache.hugegraph.backend.store.BackendStore
    public long getCounter(HugeType hugeType) {
        Object queryByRaft = queryByRaft(hugeType, true, obj -> {
            return Long.valueOf(this.store.getCounter(hugeType));
        });
        if ($assertionsDisabled || (queryByRaft instanceof Long)) {
            return ((Long) queryByRaft).longValue();
        }
        throw new AssertionError();
    }

    private Object submitAndWait(RaftRequests.StoreAction storeAction, byte[] bArr) {
        return submitAndWait(new StoreCommand(this.context.storeType(store()), storeAction, bArr));
    }

    private Object submitAndWait(StoreCommand storeCommand) {
        return node().submitAndWait(storeCommand, new RaftStoreClosure(storeCommand));
    }

    private Object queryByRaft(Object obj, Function<Object, Object> function) {
        return queryByRaft(obj, this.isSafeRead, function);
    }

    private Object queryByRaft(final Object obj, boolean z, final Function<Object, Object> function) {
        if (!z) {
            return function.apply(obj);
        }
        final RaftClosure raftClosure = new RaftClosure();
        node().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { // from class: org.apache.hugegraph.backend.store.raft.RaftBackendStore.1
            public void run(Status status, long j, byte[] bArr) {
                if (!status.isOk()) {
                    raftClosure.failure(status, new BackendException("Failed to do raft read-index: %s", status));
                    return;
                }
                RaftClosure raftClosure2 = raftClosure;
                Function function2 = function;
                Object obj2 = obj;
                raftClosure2.complete(status, () -> {
                    return function2.apply(obj2);
                });
            }
        });
        try {
            return raftClosure.waitFinished();
        } catch (Throwable th) {
            LOG.warn("Failed to execute query '{}': {}", new Object[]{obj, raftClosure.status(), th});
            throw new BackendException("Failed to execute query: %s", th, obj);
        }
    }

    private MutationBatch getOrNewBatch() {
        MutationBatch mutationBatch = this.mutationBatch.get();
        if (mutationBatch == null) {
            mutationBatch = new MutationBatch();
            this.mutationBatch.set(mutationBatch);
        }
        return mutationBatch;
    }

    static {
        $assertionsDisabled = !RaftBackendStore.class.desiredAssertionStatus();
        LOG = Log.logger(RaftBackendStore.class);
    }
}
