/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.highlevel;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.DatabaseChangeEvent;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class Rollback
implements DatabaseChangeEvent {
    private static final Logger LOGGER = LoggerFactory.getLogger(Rollback.class);
    private final Client client;
    private final int vbucket;
    private final long seqno;
    private final Consumer<Throwable> errorHandler;
    private final StreamOffset failedStartOffset;

    public Rollback(Client client, int vbucket, long seqno, Consumer<Throwable> errorHandler) {
        this.client = Objects.requireNonNull(client);
        this.vbucket = vbucket;
        this.seqno = seqno;
        this.errorHandler = Objects.requireNonNull(errorHandler);
        this.failedStartOffset = Optional.ofNullable(client.sessionState().get(vbucket).getMostRecentOpenStreamOffset()).orElse(StreamOffset.ZERO);
    }

    @Override
    public int getVbucket() {
        return this.vbucket;
    }

    public long getSeqno() {
        return this.seqno;
    }

    public StreamOffset getFailedStartOffset() {
        return this.failedStartOffset;
    }

    public void resume() {
        this.client.rollbackAndRestartStream(this.vbucket, this.seqno).retryWhen((Retry)Retry.backoff((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(10L)).maxBackoff(Duration.ofSeconds(5L)).doAfterRetry(retrySignal -> LOGGER.info("Retrying rollbackAndRestartStream for vbucket {}", (Object)this.vbucket))).onErrorResume(t -> {
            this.errorHandler.accept((Throwable)t);
            return Mono.empty();
        }).doOnSuccess(ignore -> LOGGER.info("Rollback for partition {} complete!", (Object)this.vbucket)).subscribe();
    }

    @Override
    public void dispatch(DatabaseChangeListener listener) {
        listener.onRollback(this);
    }
}

