/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

class BootstrapperImpl
implements Bootstrapper {
    private final Logger logger = LoggerFactory.getLogger(BootstrapperImpl.class);
    private final PartitionSynchronizer synchronizer;
    private final LeaseStore leaseStore;
    private final LeaseStoreManager epkRangeVersionLeaseStoreManager;
    private final ChangeFeedMode changeFeedModeToStart;
    private final Duration lockTime;
    private final Duration sleepTime;
    private volatile boolean isInitialized;
    private volatile boolean isLockAcquired;

    public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime, LeaseStoreManager epkRangeVersionLeaseStoreManager, ChangeFeedMode changeFeedModeToStart) {
        Preconditions.checkNotNull(synchronizer, "Argument 'synchronizer' can not be null");
        Preconditions.checkNotNull(leaseStore, "Argument 'leaseStore' can not be null");
        Preconditions.checkArgument(lockTime != null && this.isPositive(lockTime), "lockTime should be non-null and positive");
        Preconditions.checkArgument(sleepTime != null && this.isPositive(sleepTime), "sleepTime should be non-null and positive");
        this.synchronizer = synchronizer;
        this.leaseStore = leaseStore;
        this.epkRangeVersionLeaseStoreManager = epkRangeVersionLeaseStoreManager;
        this.changeFeedModeToStart = changeFeedModeToStart;
        this.lockTime = lockTime;
        this.sleepTime = sleepTime;
        this.isInitialized = false;
    }

    private boolean isPositive(Duration duration) {
        return !duration.isNegative() && !duration.isZero();
    }

    @Override
    public Mono<Void> initialize() {
        this.isInitialized = false;
        return Mono.just((Object)this).flatMap(value -> this.leaseStore.isInitialized()).flatMap(initialized -> {
            this.isInitialized = initialized;
            if (initialized.booleanValue()) {
                return this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
            }
            this.logger.info("Acquire initialization lock");
            return this.leaseStore.acquireInitializationLock(this.lockTime).flatMap(lockAcquired -> {
                this.isLockAcquired = lockAcquired;
                if (!this.isLockAcquired) {
                    this.logger.info("Another instance is initializing the store");
                    return Mono.just((Object)this.isLockAcquired).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
                }
                return this.synchronizer.createMissingLeases().then(this.leaseStore.markInitialized());
            }).onErrorResume(throwable -> {
                this.logger.warn("Unexpected exception caught while initializing the lock", throwable);
                return Mono.just((Object)this.isLockAcquired);
            }).flatMap(lockAcquired -> {
                if (this.isLockAcquired) {
                    return this.leaseStore.releaseInitializationLock();
                }
                return Mono.just((Object)lockAcquired);
            });
        }).repeat(() -> !this.isInitialized).then();
    }

    private Mono<Void> validateLeaseCFModeInteroperabilityForEpkRangeBasedLease() {
        return this.epkRangeVersionLeaseStoreManager.getTopLeases(1).next().flatMap(lease -> {
            ChangeFeedState changeFeedState;
            if (lease.getVersion() == LeaseVersion.EPK_RANGE_BASED_LEASE && !Strings.isNullOrEmpty(lease.getId()) && !Strings.isNullOrEmpty(lease.getContinuationToken()) && (changeFeedState = ChangeFeedState.fromString(lease.getContinuationToken())).getMode() != this.changeFeedModeToStart) {
                return Mono.error((Throwable)new IllegalStateException("Change feed mode in the pre-existing lease is : " + (Object)((Object)changeFeedState.getMode()) + " while the expected change feed mode is : " + (Object)((Object)this.changeFeedModeToStart)));
            }
            return Mono.empty();
        });
    }
}

