/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup.tx;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.PullRequestMonitor;
import org.neo4j.causalclustering.catchup.tx.TxPullRequest;
import org.neo4j.causalclustering.catchup.tx.TxPullResponse;
import org.neo4j.causalclustering.catchup.tx.TxStreamFinishedResponse;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionException;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CatchupPollingProcess
extends LifecycleAdapter {
    private final FileSystemAbstraction fs;
    private final LocalDatabase localDatabase;
    private final Log log;
    private final Lifecycle startStopOnStoreCopy;
    private final StoreFetcher storeFetcher;
    private final CopiedStoreRecovery copiedStoreRecovery;
    private final Supplier<DatabaseHealth> databaseHealthSupplier;
    private final CatchUpClient catchUpClient;
    private final CoreMemberSelectionStrategy connectionStrategy;
    private final RenewableTimeoutService timeoutService;
    private final long txPullIntervalMillis;
    private final BatchingTxApplier applier;
    private final PullRequestMonitor pullRequestMonitor;
    private RenewableTimeoutService.RenewableTimeout timeout;
    private State state = State.TX_PULLING;
    private DatabaseHealth dbHealth;
    private CompletableFuture<Boolean> upToDateFuture;

    public CatchupPollingProcess(LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService, long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, CopiedStoreRecovery copiedStoreRecovery, Supplier<DatabaseHealth> databaseHealthSupplier) {
        this.fs = fs;
        this.localDatabase = localDatabase;
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
        this.startStopOnStoreCopy = startStopOnStoreCopy;
        this.storeFetcher = storeFetcher;
        this.catchUpClient = catchUpClient;
        this.connectionStrategy = connectionStrategy;
        this.timeoutService = timeoutService;
        this.txPullIntervalMillis = txPullIntervalMillis;
        this.applier = applier;
        this.pullRequestMonitor = (PullRequestMonitor)monitors.newMonitor(PullRequestMonitor.class, new String[0]);
        this.copiedStoreRecovery = copiedStoreRecovery;
        this.databaseHealthSupplier = databaseHealthSupplier;
    }

    public synchronized void start() throws Throwable {
        this.timeout = this.timeoutService.create(Timeouts.TX_PULLER_TIMEOUT, this.txPullIntervalMillis, 0L, timeout -> this.onTimeout());
        this.dbHealth = this.databaseHealthSupplier.get();
        this.upToDateFuture = new CompletableFuture();
    }

    public Future<Boolean> upToDateFuture() throws InterruptedException {
        return this.upToDateFuture;
    }

    public void stop() throws Throwable {
        this.timeout.cancel();
    }

    public State state() {
        return this.state;
    }

    private void onTimeout() {
        try {
            switch (this.state) {
                case TX_PULLING: {
                    this.pullTransactions();
                    break;
                }
                case STORE_COPYING: {
                    this.copyStore();
                    break;
                }
                default: {
                    throw new IllegalStateException("Tried to execute catchup but was in state " + (Object)((Object)this.state));
                }
            }
        }
        catch (Throwable e) {
            this.panic(e);
        }
        if (this.state != State.PANIC) {
            this.timeout.renew();
        }
    }

    private synchronized void panic(Throwable e) {
        this.log.error("Unexpected issue in catchup process. No more catchup requests will be scheduled.", e);
        this.dbHealth.panic(e);
        this.upToDateFuture.completeExceptionally(e);
        this.state = State.PANIC;
    }

    private void pullTransactions() {
        MemberId core;
        try {
            core = this.connectionStrategy.coreMember();
        }
        catch (CoreMemberSelectionException e) {
            this.log.warn("Could not find core member to pull from", (Throwable)e);
            return;
        }
        StoreId localStoreId = this.localDatabase.storeId();
        boolean moreToPull = true;
        int batchCount = 1;
        while (moreToPull) {
            moreToPull = this.pullAndApplyBatchOfTransactions(core, localStoreId, batchCount);
            ++batchCount;
        }
    }

    private synchronized void handleTransaction(CommittedTransactionRepresentation tx) {
        if (this.state == State.PANIC) {
            return;
        }
        try {
            this.applier.queue(tx);
        }
        catch (Throwable e) {
            this.panic(e);
        }
    }

    private synchronized void streamComplete() {
        if (this.state == State.PANIC) {
            return;
        }
        try {
            this.applier.applyBatch();
        }
        catch (Throwable e) {
            this.panic(e);
        }
    }

    private boolean pullAndApplyBatchOfTransactions(MemberId core, StoreId localStoreId, int batchCount) {
        CatchupResult catchupResult;
        long lastQueuedTxId = this.applier.lastQueuedTxId();
        this.pullRequestMonitor.txPullRequest(lastQueuedTxId);
        TxPullRequest txPullRequest = new TxPullRequest(lastQueuedTxId, localStoreId);
        this.log.debug("Pull transactions where tx id > %d [batch #%d]", new Object[]{lastQueuedTxId, batchCount});
        try {
            catchupResult = this.catchUpClient.makeBlockingRequest(core, txPullRequest, new CatchUpResponseAdaptor<CatchupResult>(){

                @Override
                public void onTxPullResponse(CompletableFuture<CatchupResult> signal, TxPullResponse response) {
                    CatchupPollingProcess.this.handleTransaction(response.tx());
                }

                @Override
                public void onTxStreamFinishedResponse(CompletableFuture<CatchupResult> signal, TxStreamFinishedResponse response) {
                    CatchupPollingProcess.this.streamComplete();
                    signal.complete(response.status());
                }
            });
        }
        catch (CatchUpClientException e) {
            this.streamComplete();
            return false;
        }
        switch (catchupResult) {
            case SUCCESS_END_OF_BATCH: {
                return true;
            }
            case SUCCESS_END_OF_STREAM: {
                this.log.debug("Successfully pulled transactions from %d", new Object[]{lastQueuedTxId});
                this.upToDateFuture.complete(true);
                return false;
            }
            case E_TRANSACTION_PRUNED: {
                this.log.info("Tx pull unable to get transactions starting from %d since transactions have been pruned. Attempting a store copy.", new Object[]{lastQueuedTxId});
                this.state = State.STORE_COPYING;
                return false;
            }
        }
        this.log.info("Tx pull request unable to get transactions > %d " + lastQueuedTxId);
        return false;
    }

    private void copyStore() {
        MemberId core;
        try {
            core = this.connectionStrategy.coreMember();
        }
        catch (CoreMemberSelectionException e) {
            this.log.warn("Could not find core member from which to copy store", (Throwable)e);
            return;
        }
        StoreId localStoreId = this.localDatabase.storeId();
        this.downloadDatabase(core, localStoreId);
    }

    private void downloadDatabase(MemberId core, StoreId localStoreId) {
        try {
            this.localDatabase.stop();
            this.startStopOnStoreCopy.stop();
        }
        catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
        try {
            new CopyStoreSafely(this.fs, this.localDatabase, this.copiedStoreRecovery, this.log).copyWholeStoreFrom(core, localStoreId, this.storeFetcher);
        }
        catch (IOException | StoreCopyFailedException | StreamingTransactionsFailedException e) {
            this.log.warn(String.format("Error copying store from: %s. Will retry shortly.", core));
            return;
        }
        try {
            this.localDatabase.start();
            this.startStopOnStoreCopy.start();
        }
        catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
        this.state = State.TX_PULLING;
        this.applier.refreshFromNewStore();
    }

    static enum State {
        TX_PULLING,
        STORE_COPYING,
        PANIC;

    }

    static enum Timeouts implements RenewableTimeoutService.TimeoutName
    {
        TX_PULLER_TIMEOUT;

    }
}

