/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.BackoffRetryUtility;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.ISessionToken;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionTokenMismatchRetryPolicy;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.QuorumReader;
import com.azure.cosmos.implementation.directconnectivity.ReadMode;
import com.azure.cosmos.implementation.directconnectivity.ReplicatedResourceClient;
import com.azure.cosmos.implementation.directconnectivity.RequestHelper;
import com.azure.cosmos.implementation.directconnectivity.StoreReader;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;
import com.azure.cosmos.implementation.directconnectivity.TransportClient;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ConsistencyReader {
    private static final Logger logger = LoggerFactory.getLogger(ConsistencyReader.class);
    private final DiagnosticsClientContext diagnosticsClientContext;
    private final GatewayServiceConfigurationReader serviceConfigReader;
    private final StoreReader storeReader;
    private final QuorumReader quorumReader;
    private final Configs configs;

    public ConsistencyReader(DiagnosticsClientContext diagnosticsClientContext, Configs configs, AddressSelector addressSelector, ISessionContainer sessionContainer, TransportClient transportClient, GatewayServiceConfigurationReader serviceConfigReader, IAuthorizationTokenProvider authorizationTokenProvider) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.configs = configs;
        this.serviceConfigReader = serviceConfigReader;
        this.storeReader = this.createStoreReader(transportClient, addressSelector, sessionContainer);
        this.quorumReader = this.createQuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider);
    }

    public Mono<StoreResponse> readAsync(RxDocumentServiceRequest entity, TimeoutHelper timeout, boolean isInRetry, boolean forceRefresh) {
        ReadMode desiredReadMode;
        if (!isInRetry) {
            if (timeout.isElapsed()) {
                return Mono.error((Throwable)((Object)new RequestTimeoutException()));
            }
        } else if (timeout.isElapsed()) {
            return Mono.error((Throwable)((Object)new GoneException()));
        }
        entity.requestContext.timeoutHelper = timeout;
        if (entity.requestContext.requestChargeTracker == null) {
            entity.requestContext.requestChargeTracker = new RequestChargeTracker();
        }
        if (entity.requestContext.cosmosDiagnostics == null) {
            entity.requestContext.cosmosDiagnostics = entity.createCosmosDiagnostics();
        }
        entity.requestContext.forceRefreshAddressCache = forceRefresh;
        Utils.ValueHolder<Object> targetConsistencyLevel = Utils.ValueHolder.initialize(null);
        Utils.ValueHolder<Object> useSessionToken = Utils.ValueHolder.initialize(null);
        try {
            desiredReadMode = this.deduceReadMode(entity, targetConsistencyLevel, useSessionToken);
        }
        catch (CosmosException e) {
            return Mono.error((Throwable)((Object)e));
        }
        int maxReplicaCount = this.getMaxReplicaSetSize(entity);
        int readQuorumValue = maxReplicaCount - maxReplicaCount / 2;
        switch (desiredReadMode) {
            case Primary: {
                return this.readPrimaryAsync(entity, (Boolean)useSessionToken.v);
            }
            case Strong: 
            case BoundedStaleness: {
                entity.requestContext.performLocalRefreshOnGoneException = true;
                return this.quorumReader.readStrongAsync(this.diagnosticsClientContext, entity, readQuorumValue, desiredReadMode);
            }
            case Any: {
                if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) {
                    return BackoffRetryUtility.executeRetry(() -> this.readSessionAsync(entity, desiredReadMode), new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics)));
                }
                return this.readAnyAsync(entity, desiredReadMode);
            }
        }
        throw new IllegalStateException("invalid operation " + (Object)((Object)desiredReadMode));
    }

    private Mono<StoreResponse> readPrimaryAsync(RxDocumentServiceRequest entity, boolean useSessionToken) {
        Mono<StoreResult> responseObs = this.storeReader.readPrimaryAsync(entity, false, useSessionToken);
        return responseObs.flatMap(response -> {
            try {
                return Mono.just((Object)response.toResponse());
            }
            catch (CosmosException e) {
                return Mono.error((Throwable)((Object)e));
            }
        });
    }

    private Mono<StoreResponse> readAnyAsync(RxDocumentServiceRequest entity, ReadMode readMode) {
        Mono<List<StoreResult>> responsesObs = this.storeReader.readMultipleReplicaAsync(entity, true, 1, false, false, readMode);
        return responsesObs.flatMap(responses -> {
            if (responses.size() == 0) {
                return Mono.error((Throwable)((Object)new GoneException("The requested resource is no longer available at the server.")));
            }
            try {
                return Mono.just((Object)((StoreResult)responses.get(0)).toResponse());
            }
            catch (CosmosException e) {
                return Mono.error((Throwable)((Object)e));
            }
        });
    }

    private Mono<StoreResponse> readSessionAsync(RxDocumentServiceRequest entity, ReadMode readMode) {
        if (entity.requestContext.timeoutHelper.isElapsed()) {
            return Mono.error((Throwable)((Object)new GoneException()));
        }
        Mono<List<StoreResult>> responsesObs = this.storeReader.readMultipleReplicaAsync(entity, true, 1, true, true, readMode, true, false);
        return responsesObs.flatMap(responses -> {
            if (responses.size() > 0) {
                try {
                    return Mono.just((Object)((StoreResult)responses.get(0)).toResponse(entity.requestContext.requestChargeTracker));
                }
                catch (NotFoundException notFoundException) {
                    try {
                        if (entity.requestContext.sessionToken != null && ((StoreResult)responses.get((int)0)).sessionToken != null && !entity.requestContext.sessionToken.isValid(((StoreResult)responses.get((int)0)).sessionToken)) {
                            logger.warn("Convert to session read exception, request {} SESSION Lsn {}, responseLSN {}", new Object[]{entity.getResourceAddress(), entity.requestContext.sessionToken.convertToString(), ((StoreResult)responses.get((int)0)).lsn});
                            notFoundException.getResponseHeaders().put("x-ms-substatus", Integer.toString(1002));
                        }
                        return Mono.error((Throwable)((Object)notFoundException));
                    }
                    catch (CosmosException e) {
                        return Mono.error((Throwable)((Object)e));
                    }
                }
                catch (CosmosException dce) {
                    return Mono.error((Throwable)((Object)dce));
                }
            }
            HashMap<String, String> responseHeaders = new HashMap<String, String>();
            responseHeaders.put("x-ms-substatus", Integer.toString(1002));
            ISessionToken requestSessionToken = entity.requestContext.sessionToken;
            logger.warn("Fail the session read {}, request session token {}", (Object)entity.getResourceAddress(), (Object)(requestSessionToken == null ? "<empty>" : requestSessionToken.convertToString()));
            return Mono.error((Throwable)((Object)new NotFoundException("The read session is not available for the input session token.", responseHeaders, null)));
        });
    }

    ReadMode deduceReadMode(RxDocumentServiceRequest request, Utils.ValueHolder<ConsistencyLevel> targetConsistencyLevel, Utils.ValueHolder<Boolean> useSessionToken) {
        targetConsistencyLevel.v = RequestHelper.getConsistencyLevelToUse(this.serviceConfigReader, request);
        useSessionToken.v = targetConsistencyLevel.v == ConsistencyLevel.SESSION;
        if (request.getDefaultReplicaIndex() != null) {
            useSessionToken.v = false;
            return ReadMode.Primary;
        }
        switch ((ConsistencyLevel)((Object)targetConsistencyLevel.v)) {
            case EVENTUAL: 
            case CONSISTENT_PREFIX: 
            case SESSION: {
                return ReadMode.Any;
            }
            case BOUNDED_STALENESS: {
                return ReadMode.BoundedStaleness;
            }
            case STRONG: {
                return ReadMode.Strong;
            }
        }
        throw new IllegalStateException("INVALID Consistency Level " + targetConsistencyLevel.v);
    }

    public int getMaxReplicaSetSize(RxDocumentServiceRequest entity) {
        boolean isMasterResource = ReplicatedResourceClient.isReadingFromMaster(entity.getResourceType(), entity.getOperationType());
        if (isMasterResource) {
            return this.serviceConfigReader.getSystemReplicationPolicy().getMaxReplicaSetSize();
        }
        return this.serviceConfigReader.getUserReplicationPolicy().getMaxReplicaSetSize();
    }

    public int getMinReplicaSetSize(RxDocumentServiceRequest entity) {
        boolean isMasterResource = ReplicatedResourceClient.isReadingFromMaster(entity.getResourceType(), entity.getOperationType());
        if (isMasterResource) {
            return this.serviceConfigReader.getSystemReplicationPolicy().getMinReplicaSetSize();
        }
        return this.serviceConfigReader.getUserReplicationPolicy().getMinReplicaSetSize();
    }

    public StoreReader createStoreReader(TransportClient transportClient, AddressSelector addressSelector, ISessionContainer sessionContainer) {
        return new StoreReader(transportClient, addressSelector, sessionContainer);
    }

    public QuorumReader createQuorumReader(TransportClient transportClient, AddressSelector addressSelector, StoreReader storeReader, GatewayServiceConfigurationReader serviceConfigurationReader, IAuthorizationTokenProvider authorizationTokenProvider) {
        return new QuorumReader(this.diagnosticsClientContext, transportClient, addressSelector, storeReader, serviceConfigurationReader, authorizationTokenProvider, this.configs);
    }
}

