package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.GetAllMutationTokensRequest;
import com.couchbase.client.core.message.kv.GetAllMutationTokensResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;

@InterfaceAudience.Public
@InterfaceStability.Experimental
/* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator.class */
public class BucketStreamAggregator {
    public static String DEFAULT_CONNECTION_NAME = "jvmCore";
    private final ClusterFacade core;
    private final String bucket;
    private final String name;
    private final AtomicReference<DCPConnection> connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$1, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$1.class */
    public class AnonymousClass1 implements Func1<DCPConnection, Observable<DCPRequest>> {
        final /* synthetic */ BucketStreamAggregatorState val$aggregatorState;

        AnonymousClass1(BucketStreamAggregatorState bucketStreamAggregatorState) {
            this.val$aggregatorState = bucketStreamAggregatorState;
        }

        public Observable<DCPRequest> call(DCPConnection dCPConnection) {
            return Observable.from(this.val$aggregatorState).flatMap(new Func1<BucketStreamState, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.2
                public Observable<StreamRequestResponse> call(final BucketStreamState bucketStreamState) {
                    return BucketStreamAggregator.this.core.send(new StreamRequestRequest(bucketStreamState.partition(), bucketStreamState.vbucketUUID(), bucketStreamState.startSequenceNumber(), bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket)).flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.2.1
                        public Observable<StreamRequestResponse> call(StreamRequestResponse streamRequestResponse) {
                            long rollbackToSequenceNumber;
                            switch (AnonymousClass7.$SwitchMap$com$couchbase$client$core$message$ResponseStatus[streamRequestResponse.status().ordinal()]) {
                                case 1:
                                    rollbackToSequenceNumber = 0;
                                    break;
                                case 2:
                                    rollbackToSequenceNumber = streamRequestResponse.rollbackToSequenceNumber();
                                    break;
                                default:
                                    return Observable.just(streamRequestResponse);
                            }
                            return BucketStreamAggregator.this.core.send(new StreamRequestRequest(bucketStreamState.partition(), bucketStreamState.vbucketUUID(), rollbackToSequenceNumber, bucketStreamState.endSequenceNumber(), rollbackToSequenceNumber, bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        }
                    });
                }
            }).toList().flatMap(new Func1<List<StreamRequestResponse>, Observable<DCPRequest>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.1
                public Observable<DCPRequest> call(List<StreamRequestResponse> list) {
                    return ((DCPConnection) BucketStreamAggregator.this.connection.get()).subject();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$7, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$7.class */
    public static /* synthetic */ class AnonymousClass7 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$message$ResponseStatus = new int[ResponseStatus.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.RANGE_ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.ROLLBACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public BucketStreamAggregator(ClusterFacade clusterFacade, String str) {
        this(DEFAULT_CONNECTION_NAME, clusterFacade, str);
    }

    public BucketStreamAggregator(String str, ClusterFacade clusterFacade, String str2) {
        this.connection = new AtomicReference<>();
        this.core = clusterFacade;
        this.bucket = str2;
        this.name = str;
    }

    public String name() {
        return this.name;
    }

    public Observable<DCPRequest> feed() {
        BucketStreamAggregatorState bucketStreamAggregatorState = new BucketStreamAggregatorState();
        int intValue = ((Integer) partitionSize().toBlocking().first()).intValue();
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= intValue) {
                return feed(bucketStreamAggregatorState);
            }
            bucketStreamAggregatorState.put(new BucketStreamState(s2, 0L, 0L, -1L, 0L, -1L));
            s = (short) (s2 + 1);
        }
    }

    public Observable<DCPRequest> feed(BucketStreamAggregatorState bucketStreamAggregatorState) {
        return open().flatMap(new AnonymousClass1(bucketStreamAggregatorState));
    }

    public Observable<BucketStreamAggregatorState> getCurrentState() {
        return open().flatMap(new Func1<DCPConnection, Observable<GetAllMutationTokensResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.4
            public Observable<GetAllMutationTokensResponse> call(DCPConnection dCPConnection) {
                return BucketStreamAggregator.this.core.send(new GetAllMutationTokensRequest(BucketStreamAggregator.this.bucket));
            }
        }).flatMap(new Func1<GetAllMutationTokensResponse, Observable<BucketStreamAggregatorState>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.3
            public Observable<BucketStreamAggregatorState> call(GetAllMutationTokensResponse getAllMutationTokensResponse) {
                BucketStreamAggregatorState bucketStreamAggregatorState = new BucketStreamAggregatorState();
                for (MutationToken mutationToken : getAllMutationTokensResponse.mutationTokens()) {
                    bucketStreamAggregatorState.put(new BucketStreamState((short) mutationToken.vbucketID(), mutationToken.vbucketUUID(), mutationToken.sequenceNumber()));
                }
                return Observable.just(bucketStreamAggregatorState);
            }
        }).flatMap(new Func1<BucketStreamAggregatorState, Observable<BucketStreamAggregatorState>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2
            public Observable<BucketStreamAggregatorState> call(final BucketStreamAggregatorState bucketStreamAggregatorState) {
                return Observable.from(bucketStreamAggregatorState).flatMap(new Func1<BucketStreamState, Observable<GetFailoverLogResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.3
                    public Observable<GetFailoverLogResponse> call(BucketStreamState bucketStreamState) {
                        return BucketStreamAggregator.this.core.send(new GetFailoverLogRequest(bucketStreamState.partition(), BucketStreamAggregator.this.bucket));
                    }
                }).collect(new Func0<BucketStreamAggregatorState>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.1
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public BucketStreamAggregatorState m16call() {
                        return bucketStreamAggregatorState;
                    }
                }, new Action2<BucketStreamAggregatorState, GetFailoverLogResponse>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.2
                    public void call(BucketStreamAggregatorState bucketStreamAggregatorState2, GetFailoverLogResponse getFailoverLogResponse) {
                        bucketStreamAggregatorState2.put(new BucketStreamState(getFailoverLogResponse.partition(), getFailoverLogResponse.failoverLog().get(0).vbucketUUID(), bucketStreamAggregatorState2.get(getFailoverLogResponse.partition()).startSequenceNumber()));
                    }
                });
            }
        });
    }

    private Observable<DCPConnection> open() {
        return this.connection.get() == null ? this.core.send(new OpenConnectionRequest(this.name, this.bucket)).flatMap(new Func1<OpenConnectionResponse, Observable<DCPConnection>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.5
            public Observable<DCPConnection> call(OpenConnectionResponse openConnectionResponse) {
                BucketStreamAggregator.this.connection.compareAndSet(null, openConnectionResponse.connection());
                return Observable.just(BucketStreamAggregator.this.connection.get());
            }
        }) : Observable.just(this.connection.get());
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.6
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(BucketStreamAggregator.this.bucket)).numberOfPartitions());
            }
        });
    }
}
