package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.config.BucketConfigRefreshFailedEvent;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.msg.manager.BucketConfigStreamingRequest;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.retry.reactor.Retry;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/couchbase/client/core/config/refresher/ClusterManagerBucketRefresher.class */
public class ClusterManagerBucketRefresher implements BucketRefresher {
    private final ConfigurationProvider provider;
    private final Core core;
    private final Map<String, Disposable> registrations = new ConcurrentHashMap();
    private final EventBus eventBus;

    public ClusterManagerBucketRefresher(ConfigurationProvider configurationProvider, Core core) {
        this.provider = configurationProvider;
        this.core = core;
        this.eventBus = core.context().environment().eventBus();
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public synchronized Mono<Void> register(String str) {
        return this.registrations.containsKey(str) ? Mono.empty() : Mono.defer(() -> {
            this.registrations.put(str, registerStream(this.core.context(), str));
            return Mono.empty();
        });
    }

    private Disposable registerStream(CoreContext coreContext, String str) {
        Flux retryWhen = Mono.defer(() -> {
            BucketConfigStreamingRequest bucketConfigStreamingRequest = new BucketConfigStreamingRequest(coreContext.environment().timeoutConfig().managementTimeout(), coreContext, BestEffortRetryStrategy.INSTANCE, str, coreContext.authenticator());
            this.core.send(bucketConfigStreamingRequest);
            return Reactor.wrap(bucketConfigStreamingRequest, bucketConfigStreamingRequest.response(), true);
        }).flux().flatMap(bucketConfigStreamingResponse -> {
            if (bucketConfigStreamingResponse.status().success()) {
                return bucketConfigStreamingResponse.configs().map(str2 -> {
                    return new ProposedBucketConfigContext(str, str2, bucketConfigStreamingResponse.address());
                });
            }
            this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.MANAGER, BucketConfigRefreshFailedEvent.Reason.INDIVIDUAL_REQUEST_FAILED, Optional.of(bucketConfigStreamingResponse)));
            return Flux.error(new ConfigException());
        }).doOnError(th -> {
            this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.MANAGER, BucketConfigRefreshFailedEvent.Reason.STREAM_FAILED, Optional.of(th)));
        }).doOnComplete(() -> {
            this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.MANAGER, BucketConfigRefreshFailedEvent.Reason.STREAM_CLOSED, Optional.empty()));
            throw new ConfigException();
        }).retryWhen(Retry.any().exponentialBackoff(Duration.ofMillis(32L), Duration.ofMillis(4096L)).toReactorRetry());
        ConfigurationProvider configurationProvider = this.provider;
        configurationProvider.getClass();
        return retryWhen.subscribe(configurationProvider::proposeBucketConfig);
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public synchronized Mono<Void> deregister(String str) {
        return Mono.defer(() -> {
            Disposable disposable = this.registrations.get(str);
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public synchronized Mono<Void> shutdown() {
        return Mono.defer(() -> {
            for (Disposable disposable : this.registrations.values()) {
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
            this.registrations.clear();
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public void markTainted(String str) {
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public void markUntainted(String str) {
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public Set<String> registered() {
        return this.registrations.keySet();
    }
}
