package com.microsoft.azure.cosmosdb.rx.internal.query;

import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedResponse;
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.internal.query.metrics.ClientSideMetrics;
import com.microsoft.azure.cosmosdb.internal.query.metrics.FetchExecutionRangeAccumulator;
import com.microsoft.azure.cosmosdb.internal.query.metrics.SchedulingStopwatch;
import com.microsoft.azure.cosmosdb.internal.routing.Range;
import com.microsoft.azure.cosmosdb.rx.internal.Exceptions;
import com.microsoft.azure.cosmosdb.rx.internal.IDocumentClientRetryPolicy;
import com.microsoft.azure.cosmosdb.rx.internal.ObservableHelper;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import com.microsoft.azure.cosmosdb.rx.internal.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func3;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/microsoft/azure/cosmosdb/rx/internal/query/DocumentProducer.class */
public class DocumentProducer<T extends Resource> {
    private static final Logger logger = LoggerFactory.getLogger(DocumentProducer.class);
    private int retries;
    protected final IDocumentQueryClient client;
    protected final String collectionRid;
    protected final FeedOptions feedOptions;
    protected final Class<T> resourceType;
    protected final PartitionKeyRange targetRange;
    protected final String collectionLink;
    protected final Func3<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc;
    protected final Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> executeRequestFuncWithRetries;
    protected final Func0<IDocumentClientRetryPolicy> createRetryPolicyFunc;
    protected final int pageSize;
    protected final UUID correlatedActivityId;
    public int top;
    private volatile String lastResponseContinuationToken;
    private final SchedulingStopwatch fetchSchedulingMetrics = new SchedulingStopwatch();
    private SchedulingStopwatch moveNextSchedulingMetrics;
    private final FetchExecutionRangeAccumulator fetchExecutionRangeAccumulator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/microsoft/azure/cosmosdb/rx/internal/query/DocumentProducer$DocumentProducerFeedResponse.class */
    public class DocumentProducerFeedResponse {
        FeedResponse<T> pageResult;
        PartitionKeyRange sourcePartitionKeyRange;

        DocumentProducerFeedResponse(FeedResponse<T> feedResponse) {
            this.pageResult = feedResponse;
            this.sourcePartitionKeyRange = DocumentProducer.this.targetRange;
            populatePartitionedQueryMetrics();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DocumentProducerFeedResponse(FeedResponse<T> feedResponse, PartitionKeyRange partitionKeyRange) {
            this.pageResult = feedResponse;
            this.sourcePartitionKeyRange = partitionKeyRange;
            populatePartitionedQueryMetrics();
        }

        void populatePartitionedQueryMetrics() {
            String str = (String) this.pageResult.getResponseHeaders().get("x-ms-documentdb-query-metrics");
            if (StringUtils.isEmpty(str)) {
                return;
            }
            BridgeInternal.putQueryMetricsIntoMap(this.pageResult, DocumentProducer.this.targetRange.getId(), BridgeInternal.createQueryMetricsFromDelimitedStringAndClientSideMetrics(str + String.format(";%s=%.2f", "requestCharge", Double.valueOf(this.pageResult.getRequestCharge())), new ClientSideMetrics(DocumentProducer.this.retries, this.pageResult.getRequestCharge(), DocumentProducer.this.fetchExecutionRangeAccumulator.getExecutionRanges(), Arrays.asList(new ImmutablePair(DocumentProducer.this.targetRange.getId(), DocumentProducer.this.fetchSchedulingMetrics.getElapsedTime()))), this.pageResult.getActivityId()));
        }
    }

    public DocumentProducer(IDocumentQueryClient iDocumentQueryClient, String str, FeedOptions feedOptions, Func3<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> func3, Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> func1, PartitionKeyRange partitionKeyRange, String str2, Func0<IDocumentClientRetryPolicy> func0, Class<T> cls, UUID uuid, int i, String str3, int i2) {
        this.client = iDocumentQueryClient;
        this.collectionRid = str;
        this.createRequestFunc = func3;
        this.fetchSchedulingMetrics.ready();
        this.fetchExecutionRangeAccumulator = new FetchExecutionRangeAccumulator(partitionKeyRange.getId());
        this.executeRequestFuncWithRetries = rxDocumentServiceRequest -> {
            this.retries = -1;
            this.fetchSchedulingMetrics.start();
            this.fetchExecutionRangeAccumulator.beginFetchRange();
            IDocumentClientRetryPolicy iDocumentClientRetryPolicy = null;
            if (func0 != null) {
                iDocumentClientRetryPolicy = (IDocumentClientRetryPolicy) func0.call();
                iDocumentClientRetryPolicy.onBeforeSendRequest(rxDocumentServiceRequest);
            }
            return ObservableHelper.inlineIfPossibleAsObs(() -> {
                this.retries++;
                return (Observable) func1.call(rxDocumentServiceRequest);
            }, iDocumentClientRetryPolicy);
        };
        this.correlatedActivityId = uuid;
        this.feedOptions = feedOptions != null ? feedOptions : new FeedOptions();
        this.feedOptions.setRequestContinuation(str3);
        this.lastResponseContinuationToken = str3;
        this.resourceType = cls;
        this.targetRange = partitionKeyRange;
        this.collectionLink = str2;
        this.createRetryPolicyFunc = func0;
        this.pageSize = i;
        this.top = i2;
    }

    public Observable<DocumentProducer<T>.DocumentProducerFeedResponse> produceAsync() {
        return splitProof(Paginator.getPaginatedQueryResultAsObservable(this.feedOptions, (str, num) -> {
            return (RxDocumentServiceRequest) this.createRequestFunc.call(this.targetRange, str, num);
        }, this.executeRequestFuncWithRetries, this.resourceType, this.top, this.pageSize).map(feedResponse -> {
            this.lastResponseContinuationToken = feedResponse.getResponseContinuation();
            this.fetchExecutionRangeAccumulator.endFetchRange(feedResponse.getActivityId(), feedResponse.getResults().size(), this.retries);
            this.fetchSchedulingMetrics.stop();
            return feedResponse;
        }).map(feedResponse2 -> {
            return new DocumentProducerFeedResponse(feedResponse2);
        }));
    }

    private Observable<DocumentProducer<T>.DocumentProducerFeedResponse> splitProof(Observable<DocumentProducer<T>.DocumentProducerFeedResponse> observable) {
        return observable.onErrorResumeNext(th -> {
            DocumentClientException documentClientException = (DocumentClientException) Utils.as(th, DocumentClientException.class);
            if (documentClientException == null || !isSplit(documentClientException)) {
                logger.error("Unexpected failure", th);
                return Observable.error(th);
            }
            logger.info("DocumentProducer handling a partition split in [{}], detail:[{}]", this.targetRange, documentClientException);
            return produceOnSplit(getReplacementRanges(this.targetRange.toRange()).toObservable().flatMap(list -> {
                if (logger.isDebugEnabled()) {
                    logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions, last continuation token is [{}].", new Object[]{this.targetRange.toJson(), String.join(", ", (Iterable<? extends CharSequence>) list.stream().map(partitionKeyRange -> {
                        return partitionKeyRange.toJson();
                    }).collect(Collectors.toList())), this.lastResponseContinuationToken});
                }
                return Observable.from(createReplacingDocumentProducersOnSplit(list));
            }));
        });
    }

    protected Observable<DocumentProducer<T>.DocumentProducerFeedResponse> produceOnSplit(Observable<DocumentProducer<T>> observable) {
        return observable.flatMap(documentProducer -> {
            return documentProducer.produceAsync();
        }, 1);
    }

    private List<DocumentProducer<T>> createReplacingDocumentProducersOnSplit(List<PartitionKeyRange> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<PartitionKeyRange> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(createChildDocumentProducerOnSplit(it.next(), this.lastResponseContinuationToken));
        }
        return arrayList;
    }

    protected DocumentProducer<T> createChildDocumentProducerOnSplit(PartitionKeyRange partitionKeyRange, String str) {
        return new DocumentProducer<>(this.client, this.collectionRid, this.feedOptions, this.createRequestFunc, this.executeRequestFuncWithRetries, partitionKeyRange, this.collectionLink, null, this.resourceType, this.correlatedActivityId, this.pageSize, str, this.top);
    }

    private Single<List<PartitionKeyRange>> getReplacementRanges(Range<String> range) {
        return this.client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(this.collectionRid, range, true, this.feedOptions.getProperties());
    }

    private boolean isSplit(DocumentClientException documentClientException) {
        return Exceptions.isPartitionSplit(documentClientException);
    }
}
