/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.neo4j.causalclustering.SessionTracker;
import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.ProgressTracker;
import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState;
import org.neo4j.causalclustering.core.state.CommandDispatcher;
import org.neo4j.causalclustering.core.state.CoreStateApplier;
import org.neo4j.causalclustering.core.state.InFlightLogEntryReader;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.core.state.machines.CoreStateMachines;
import org.neo4j.causalclustering.core.state.machines.tx.CoreReplicatedContent;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshot;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateType;
import org.neo4j.causalclustering.core.state.snapshot.RaftCoreState;
import org.neo4j.causalclustering.core.state.storage.StateStorage;
import org.neo4j.causalclustering.helper.StatUtil;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CommandApplicationProcess
extends LifecycleAdapter {
    private static final long NOTHING = -1L;
    private final RaftLog raftLog;
    private final StateStorage<Long> lastFlushedStorage;
    private final int flushEvery;
    private final ProgressTracker progressTracker;
    private final SessionTracker sessionTracker;
    private final Supplier<DatabaseHealth> dbHealth;
    private final InFlightMap<RaftLogEntry> inFlightMap;
    private final Log log;
    private final CoreStateApplier applier;
    private final RaftLogCommitIndexMonitor commitIndexMonitor;
    private final OperationBatcher batcher;
    private StatUtil.StatContext batchStat;
    private CoreStateMachines coreStateMachines;
    private boolean started;
    private long lastApplied = -1L;
    private volatile long lastSeenCommitIndex = -1L;
    private long lastFlushed = -1L;

    public CommandApplicationProcess(CoreStateMachines coreStateMachines, RaftLog raftLog, int maxBatchSize, int flushEvery, Supplier<DatabaseHealth> dbHealth, LogProvider logProvider, ProgressTracker progressTracker, StateStorage<Long> lastFlushedStorage, SessionTracker sessionTracker, CoreStateApplier applier, InFlightMap<RaftLogEntry> inFlightMap, Monitors monitors) {
        this.coreStateMachines = coreStateMachines;
        this.raftLog = raftLog;
        this.lastFlushedStorage = lastFlushedStorage;
        this.flushEvery = flushEvery;
        this.progressTracker = progressTracker;
        this.sessionTracker = sessionTracker;
        this.applier = applier;
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
        this.dbHealth = dbHealth;
        this.inFlightMap = inFlightMap;
        this.commitIndexMonitor = (RaftLogCommitIndexMonitor)monitors.newMonitor(RaftLogCommitIndexMonitor.class, ((Object)((Object)this)).getClass(), new String[0]);
        this.batcher = new OperationBatcher(maxBatchSize);
        this.batchStat = StatUtil.create("BatchSize", this.log, 4096L, true);
    }

    synchronized void notifyCommitted(long commitIndex) {
        assert (this.lastSeenCommitIndex <= commitIndex);
        if (this.lastSeenCommitIndex < commitIndex) {
            this.lastSeenCommitIndex = commitIndex;
            if (this.started) {
                this.submitApplyJob(commitIndex);
                this.commitIndexMonitor.commitIndex(commitIndex);
            }
        }
    }

    private void submitApplyJob(long lastToApply) {
        long snapshotLastSeenCommitIndex = this.lastSeenCommitIndex;
        boolean success = this.applier.submit(status -> () -> {
            try (InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader(this.raftLog, this.inFlightMap, true);){
                for (long logIndex = this.lastApplied + 1L; !status.isCancelled() && logIndex <= snapshotLastSeenCommitIndex; ++logIndex) {
                    RaftLogEntry entry = logEntrySupplier.get(logIndex);
                    if (entry == null) {
                        throw new IllegalStateException(String.format("Committed log %d entry must exist.", logIndex));
                    }
                    if (entry.content() instanceof DistributedOperation) {
                        DistributedOperation distributedOperation = (DistributedOperation)entry.content();
                        this.progressTracker.trackReplication(distributedOperation);
                        this.batcher.add(logIndex, distributedOperation);
                        continue;
                    }
                    this.batcher.flush();
                    this.lastApplied = logIndex;
                }
                this.batcher.flush();
            }
            catch (Throwable e) {
                this.log.error("Failed to apply up to index " + lastToApply, e);
                this.dbHealth.get().panic(e);
                this.applier.panic();
            }
        });
        if (!success) {
            this.log.error("Applier has entered a state of panic, no more jobs can be submitted.");
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    synchronized long lastApplied() {
        return this.lastApplied;
    }

    public synchronized void sync() throws InterruptedException {
        this.applier.sync(true);
    }

    public void prune() throws IOException {
        this.raftLog.prune(this.lastFlushed);
    }

    private long handleOperations(long commandIndex, List<DistributedOperation> operations) {
        try (CommandDispatcher dispatcher = this.coreStateMachines.commandDispatcher();){
            for (DistributedOperation operation : operations) {
                if (!this.sessionTracker.validateOperation(operation.globalSession(), operation.operationId())) {
                    this.sessionTracker.validateOperation(operation.globalSession(), operation.operationId());
                    ++commandIndex;
                    continue;
                }
                CoreReplicatedContent command = (CoreReplicatedContent)operation.content();
                command.dispatch(dispatcher, commandIndex, result -> this.progressTracker.trackResult(operation, (Result)result));
                this.sessionTracker.update(operation.globalSession(), operation.operationId(), commandIndex);
                ++commandIndex;
            }
        }
        return commandIndex - 1L;
    }

    private void maybeFlush() throws IOException {
        if (this.lastApplied - this.lastFlushed > (long)this.flushEvery) {
            this.flush();
        }
    }

    private void flush() throws IOException {
        this.coreStateMachines.flush();
        this.sessionTracker.flush();
        this.lastFlushedStorage.persistStoreData(this.lastApplied);
        this.lastFlushed = this.lastApplied;
    }

    public synchronized void start() throws IOException, InterruptedException {
        if (this.lastFlushed == -1L) {
            this.lastFlushed = this.lastFlushedStorage.getInitialState();
        }
        this.lastApplied = this.lastFlushed;
        this.log.info(String.format("Restoring last applied index to %d", this.lastApplied));
        this.sessionTracker.start();
        long lastPossiblyApplying = Math.max(this.coreStateMachines.getLastAppliedIndex(), this.sessionTracker.getLastAppliedIndex());
        lastPossiblyApplying = Math.max(lastPossiblyApplying, this.lastSeenCommitIndex);
        if (lastPossiblyApplying > this.lastApplied) {
            this.log.info("Applying up to: " + lastPossiblyApplying);
            this.submitApplyJob(lastPossiblyApplying);
            this.applier.sync(false);
        }
        this.started = true;
    }

    public synchronized void stop() throws InterruptedException, IOException {
        this.started = false;
        this.applier.sync(true);
        this.flush();
    }

    public synchronized CoreSnapshot snapshot(RaftMachine raft) throws IOException, InterruptedException {
        this.applier.sync(false);
        long prevIndex = this.lastApplied;
        long prevTerm = this.raftLog.readEntryTerm(prevIndex);
        CoreSnapshot coreSnapshot = new CoreSnapshot(prevIndex, prevTerm);
        this.coreStateMachines.addSnapshots(coreSnapshot);
        coreSnapshot.add(CoreStateType.SESSION_TRACKER, this.sessionTracker.snapshot());
        coreSnapshot.add(CoreStateType.RAFT_CORE_STATE, raft.coreState());
        return coreSnapshot;
    }

    synchronized void installSnapshot(CoreSnapshot coreSnapshot, RaftMachine raft) throws IOException {
        this.coreStateMachines.installSnapshots(coreSnapshot);
        long snapshotPrevIndex = coreSnapshot.prevIndex();
        try {
            this.raftLog.skip(snapshotPrevIndex, coreSnapshot.prevTerm());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.lastApplied = this.lastFlushed = snapshotPrevIndex;
        this.log.info(String.format("Skipping lastApplied index forward to %d", snapshotPrevIndex));
        raft.installCoreState((RaftCoreState)coreSnapshot.get(CoreStateType.RAFT_CORE_STATE));
        this.sessionTracker.installSnapshot((GlobalSessionTrackerState)coreSnapshot.get(CoreStateType.SESSION_TRACKER));
        this.flush();
    }

    private class OperationBatcher {
        private List<DistributedOperation> batch;
        private int maxBatchSize;
        private long lastIndex;

        OperationBatcher(int maxBatchSize) {
            this.batch = new ArrayList<DistributedOperation>(maxBatchSize);
            this.maxBatchSize = maxBatchSize;
        }

        private void add(long index, DistributedOperation operation) throws Exception {
            if (this.batch.size() > 0) assert (index == this.lastIndex + 1L);
            this.batch.add(operation);
            this.lastIndex = index;
            if (this.batch.size() == this.maxBatchSize) {
                this.flush();
            }
        }

        private void flush() throws Exception {
            if (this.batch.size() == 0) {
                return;
            }
            CommandApplicationProcess.this.batchStat.collect(this.batch.size());
            long startIndex = this.lastIndex - (long)this.batch.size() + 1L;
            long lastHandledIndex = CommandApplicationProcess.this.handleOperations(startIndex, this.batch);
            assert (lastHandledIndex == this.lastIndex);
            CommandApplicationProcess.this.lastApplied = this.lastIndex;
            this.batch.clear();
            CommandApplicationProcess.this.maybeFlush();
        }
    }
}

