package com.couchbase.client.core;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.CoreCouchbaseOps;
import com.couchbase.client.core.api.kv.CoreKvBinaryOps;
import com.couchbase.client.core.api.kv.CoreKvOps;
import com.couchbase.client.core.api.manager.CoreBucketAndScope;
import com.couchbase.client.core.api.manager.search.ClassicCoreClusterSearchIndexManager;
import com.couchbase.client.core.api.manager.search.ClassicCoreScopeSearchIndexManager;
import com.couchbase.client.core.api.manager.search.CoreSearchIndexManager;
import com.couchbase.client.core.api.query.CoreQueryOps;
import com.couchbase.client.core.api.search.ClassicCoreSearchOps;
import com.couchbase.client.core.api.search.CoreSearchOps;
import com.couchbase.client.core.callbacks.BeforeSendRequestCallback;
import com.couchbase.client.core.classic.kv.ClassicCoreKvBinaryOps;
import com.couchbase.client.core.classic.kv.ClassicCoreKvOps;
import com.couchbase.client.core.classic.manager.ClassicCoreBucketManager;
import com.couchbase.client.core.classic.manager.ClassicCoreCollectionManagerOps;
import com.couchbase.client.core.classic.query.ClassicCoreQueryOps;
import com.couchbase.client.core.cnc.CbTracing;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.ValueRecorder;
import com.couchbase.client.core.cnc.events.core.BucketClosedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenFailedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenInitiatedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenedEvent;
import com.couchbase.client.core.cnc.events.core.CoreCreatedEvent;
import com.couchbase.client.core.cnc.events.core.InitGlobalConfigFailedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationErrorDetectedEvent;
import com.couchbase.client.core.cnc.events.core.ServiceReconfigurationFailedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownInitiatedEvent;
import com.couchbase.client.core.cnc.events.core.WatchdogInvalidStateIdentifiedEvent;
import com.couchbase.client.core.cnc.events.core.WatchdogRunFailedEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionsStartedEvent;
import com.couchbase.client.core.cnc.metrics.LoggingMeter;
import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.DefaultConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.InternalEndpointDiagnostics;
import com.couchbase.client.core.diagnostics.WaitUntilReadyHelper;
import com.couchbase.client.core.endpoint.http.CoreHttpClient;
import com.couchbase.client.core.env.Authenticator;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.AlreadyShutdownException;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.GlobalConfigNotFoundException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.manager.CoreBucketManagerOps;
import com.couchbase.client.core.manager.CoreCollectionManager;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.kv.KeyValueRequest;
import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.msg.search.ServerSearchRequest;
import com.couchbase.client.core.node.AnalyticsLocator;
import com.couchbase.client.core.node.KeyValueLocator;
import com.couchbase.client.core.node.Locator;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.node.RoundRobinLocator;
import com.couchbase.client.core.node.ViewLocator;
import com.couchbase.client.core.service.ServiceScope;
import com.couchbase.client.core.service.ServiceState;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup;
import com.couchbase.client.core.transaction.components.CoreTransactionRequest;
import com.couchbase.client.core.transaction.context.CoreTransactionsContext;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.ConnectionStringUtil;
import com.couchbase.client.core.util.CoreIdGenerator;
import com.couchbase.client.core.util.LatestStateSubscription;
import com.couchbase.client.core.util.NanoTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Volatile
/* loaded from: input_file:com/couchbase/client/core/Core.class */
public class Core implements CoreCouchbaseOps, AutoCloseable {
    private static final KeyValueLocator KEY_VALUE_LOCATOR = new KeyValueLocator();
    private static final RoundRobinLocator MANAGER_LOCATOR = new RoundRobinLocator(ServiceType.MANAGER);
    private static final RoundRobinLocator QUERY_LOCATOR = new RoundRobinLocator(ServiceType.QUERY);
    private static final RoundRobinLocator ANALYTICS_LOCATOR = new AnalyticsLocator();
    private static final RoundRobinLocator SEARCH_LOCATOR = new RoundRobinLocator(ServiceType.SEARCH);
    private static final RoundRobinLocator VIEWS_LOCATOR = new ViewLocator();
    private static final RoundRobinLocator EVENTING_LOCATOR = new RoundRobinLocator(ServiceType.EVENTING);
    private static final RoundRobinLocator BACKUP_LOCATOR = new RoundRobinLocator(ServiceType.BACKUP);
    private static final Duration INVALID_STATE_WATCHDOG_INTERVAL = Duration.ofSeconds(5);
    private final CoreContext coreContext;
    private final ConfigurationProvider configurationProvider;
    private volatile ClusterConfig currentConfig;
    private final CopyOnWriteArrayList<Node> nodes;
    private final LatestStateSubscription<ClusterConfig> configurationProcessor;
    private final EventBus eventBus;
    private final Timer timer;
    private final Set<SeedNode> seedNodes;
    private final List<BeforeSendRequestCallback> beforeSendRequestCallbacks;
    private final Disposable invalidStateWatchdog;
    private final CoreTransactionsCleanup transactionsCleanup;
    private final CoreTransactionsContext transactionsContext;
    private final ConnectionString connectionString;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final Map<ResponseMetricIdentifier, ValueRecorder> responseMetrics = new ConcurrentHashMap();

    /* loaded from: input_file:com/couchbase/client/core/Core$InvalidStateWatchdog.class */
    class InvalidStateWatchdog implements Runnable {
        InvalidStateWatchdog() {
        }

        @Override // java.lang.Runnable
        public void run() {
            int size;
            int size2;
            try {
                if (Core.this.currentConfig != null && Core.this.currentConfig.hasClusterOrBucketConfig() && (size = Core.this.nodes.size()) != (size2 = Core.this.currentConfig.allNodeAddresses().size())) {
                    Core.this.eventBus.publish(new WatchdogInvalidStateIdentifiedEvent(Core.this.context(), "Number of managed nodes (" + size + ") differs from the current config (" + size2 + "), triggering reconfiguration."));
                    Core.this.configurationProvider.republishCurrentConfig();
                }
            } catch (Throwable th) {
                Core.this.eventBus.publish(new WatchdogRunFailedEvent(Core.this.context(), th));
            }
        }
    }

    @Stability.Internal
    /* loaded from: input_file:com/couchbase/client/core/Core$ResponseMetricIdentifier.class */
    public static class ResponseMetricIdentifier {
        private final String serviceType;
        private final String requestName;

        @Nullable
        private final String bucketName;

        @Nullable
        private final String scopeName;

        @Nullable
        private final String collectionName;

        @Nullable
        private final String exceptionSimpleName;

        ResponseMetricIdentifier(Request<?> request, @Nullable String str) {
            this.exceptionSimpleName = str;
            if (request.serviceType() != null) {
                this.serviceType = CbTracing.getTracingId(request.serviceType());
            } else if (request instanceof CoreTransactionRequest) {
                this.serviceType = TracingIdentifiers.SERVICE_TRANSACTIONS;
            } else {
                this.serviceType = TracingIdentifiers.SERVICE_UNKNOWN;
            }
            this.requestName = request.name();
            if (request instanceof KeyValueRequest) {
                KeyValueRequest keyValueRequest = (KeyValueRequest) request;
                this.bucketName = request.bucket();
                this.scopeName = keyValueRequest.collectionIdentifier().scope().orElse("_default");
                this.collectionName = keyValueRequest.collectionIdentifier().collection().orElse("_default");
                return;
            }
            if (request instanceof QueryRequest) {
                this.bucketName = request.bucket();
                this.scopeName = ((QueryRequest) request).scope();
                this.collectionName = null;
            } else {
                if (!(request instanceof ServerSearchRequest)) {
                    this.bucketName = null;
                    this.scopeName = null;
                    this.collectionName = null;
                    return;
                }
                ServerSearchRequest serverSearchRequest = (ServerSearchRequest) request;
                if (serverSearchRequest.scope() != null) {
                    this.bucketName = serverSearchRequest.scope().bucketName();
                    this.scopeName = serverSearchRequest.scope().scopeName();
                } else {
                    this.bucketName = null;
                    this.scopeName = null;
                }
                this.collectionName = null;
            }
        }

        public ResponseMetricIdentifier(String str, String str2) {
            this.serviceType = str;
            this.requestName = str2;
            this.bucketName = null;
            this.scopeName = null;
            this.collectionName = null;
            this.exceptionSimpleName = null;
        }

        public String serviceType() {
            return this.serviceType;
        }

        public String requestName() {
            return this.requestName;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ResponseMetricIdentifier responseMetricIdentifier = (ResponseMetricIdentifier) obj;
            return this.serviceType.equals(responseMetricIdentifier.serviceType) && Objects.equals(this.requestName, responseMetricIdentifier.requestName) && Objects.equals(this.bucketName, responseMetricIdentifier.bucketName) && Objects.equals(this.scopeName, responseMetricIdentifier.scopeName) && Objects.equals(this.collectionName, responseMetricIdentifier.collectionName) && Objects.equals(this.exceptionSimpleName, responseMetricIdentifier.exceptionSimpleName);
        }

        public int hashCode() {
            return Objects.hash(this.serviceType, this.requestName, this.bucketName, this.scopeName, this.collectionName, this.exceptionSimpleName);
        }
    }

    @Deprecated
    public static Core create(CoreEnvironment coreEnvironment, Authenticator authenticator, Set<SeedNode> set) {
        return create(coreEnvironment, authenticator, ConnectionStringUtil.asConnectionString(set));
    }

    public static Core create(CoreEnvironment coreEnvironment, Authenticator authenticator, ConnectionString connectionString) {
        return new Core(coreEnvironment, authenticator, connectionString);
    }

    protected Core(CoreEnvironment coreEnvironment, Authenticator authenticator, ConnectionString connectionString) {
        if (coreEnvironment.securityConfig().tlsEnabled() && !authenticator.supportsTls()) {
            throw new InvalidArgumentException("TLS enabled but the Authenticator does not support TLS!", null, null);
        }
        if (!coreEnvironment.securityConfig().tlsEnabled() && !authenticator.supportsNonTls()) {
            throw new InvalidArgumentException("TLS not enabled but the Authenticator does only support TLS!", null, null);
        }
        CoreCouchbaseOps.checkConnectionStringScheme(connectionString, ConnectionString.Scheme.COUCHBASE, ConnectionString.Scheme.COUCHBASES);
        CoreLimiter.incrementAndVerifyNumInstances(coreEnvironment.eventBus());
        this.connectionString = (ConnectionString) Objects.requireNonNull(connectionString);
        this.seedNodes = seedNodesFromConnectionString(connectionString, coreEnvironment);
        this.coreContext = new CoreContext(this, CoreIdGenerator.nextId(), coreEnvironment, authenticator);
        this.configurationProvider = createConfigurationProvider();
        this.nodes = new CopyOnWriteArrayList<>();
        this.eventBus = coreEnvironment.eventBus();
        this.timer = coreEnvironment.timer();
        this.currentConfig = this.configurationProvider.config();
        this.configurationProcessor = new LatestStateSubscription<>(this.configurationProvider.configs().concatWith(Mono.just(new ClusterConfig())), coreEnvironment.scheduler(), (clusterConfig, runnable) -> {
            this.currentConfig = clusterConfig;
            reconfigure(runnable);
        });
        this.beforeSendRequestCallbacks = (List) coreEnvironment.requestCallbacks().stream().filter(requestCallback -> {
            return requestCallback instanceof BeforeSendRequestCallback;
        }).map(requestCallback2 -> {
            return (BeforeSendRequestCallback) requestCallback2;
        }).collect(Collectors.toList());
        this.eventBus.publish(new CoreCreatedEvent(this.coreContext, coreEnvironment, this.seedNodes, CoreLimiter.numInstances(), connectionString));
        long seconds = INVALID_STATE_WATCHDOG_INTERVAL.getSeconds();
        if (seconds <= 1) {
            throw InvalidArgumentException.fromMessage("The Watchdog Interval cannot be smaller than 1 second!");
        }
        this.invalidStateWatchdog = coreEnvironment.scheduler().schedulePeriodically(new InvalidStateWatchdog(), seconds, seconds, TimeUnit.SECONDS);
        this.transactionsCleanup = new CoreTransactionsCleanup(this, coreEnvironment.transactionsConfig());
        this.transactionsContext = new CoreTransactionsContext(coreEnvironment.meter());
        context().environment().eventBus().publish(new TransactionsStartedEvent(coreEnvironment.transactionsConfig().cleanupConfig().runLostAttemptsCleanupThread(), coreEnvironment.transactionsConfig().cleanupConfig().runRegularAttemptsCleanupThread()));
    }

    private static Set<SeedNode> seedNodesFromConnectionString(ConnectionString connectionString, CoreEnvironment coreEnvironment) {
        return ConnectionStringUtil.seedNodesFromConnectionString(connectionString, coreEnvironment.ioConfig().dnsSrvEnabled(), coreEnvironment.securityConfig().tlsEnabled(), coreEnvironment.eventBus());
    }

    ConfigurationProvider createConfigurationProvider() {
        return new DefaultConfigurationProvider(this, this.seedNodes, this.connectionString);
    }

    @Stability.Internal
    public ConfigurationProvider configurationProvider() {
        return this.configurationProvider;
    }

    public <R extends Response> void send(Request<R> request) {
        send(request, true);
    }

    @Stability.Internal
    public <R extends Response> void send(Request<R> request, boolean z) {
        if (this.shutdown.get()) {
            request.cancel(CancellationReason.SHUTDOWN);
            return;
        }
        if (z) {
            this.timer.register(request);
            Iterator<BeforeSendRequestCallback> it = this.beforeSendRequestCallbacks.iterator();
            while (it.hasNext()) {
                it.next().beforeSend(request);
            }
        }
        locator(request.serviceType()).dispatch(request, this.nodes, this.currentConfig, context());
    }

    public CoreContext context() {
        return this.coreContext;
    }

    @Stability.Internal
    public CoreHttpClient httpClient(RequestTarget requestTarget) {
        return new CoreHttpClient(this, requestTarget);
    }

    @Stability.Internal
    public Stream<EndpointDiagnostics> diagnostics() {
        return this.nodes.stream().flatMap((v0) -> {
            return v0.diagnostics();
        });
    }

    @Stability.Internal
    public Stream<InternalEndpointDiagnostics> internalDiagnostics() {
        return this.nodes.stream().flatMap((v0) -> {
            return v0.internalDiagnostics();
        });
    }

    @Stability.Internal
    public Optional<Flux<ServiceState>> serviceState(NodeIdentifier nodeIdentifier, ServiceType serviceType, Optional<String> optional) {
        Iterator<Node> it = this.nodes.iterator();
        while (it.hasNext()) {
            Node next = it.next();
            if (next.identifier().equals(nodeIdentifier)) {
                return next.serviceState(serviceType, optional);
            }
        }
        return Optional.empty();
    }

    @Stability.Internal
    public void initGlobalConfig() {
        NanoTimestamp now = NanoTimestamp.now();
        this.configurationProvider.loadAndRefreshGlobalConfig().subscribe(r1 -> {
        }, th -> {
            InitGlobalConfigFailedEvent.Reason reason = InitGlobalConfigFailedEvent.Reason.UNKNOWN;
            if (th instanceof UnsupportedConfigMechanismException) {
                reason = InitGlobalConfigFailedEvent.Reason.UNSUPPORTED;
            } else if (th instanceof GlobalConfigNotFoundException) {
                reason = InitGlobalConfigFailedEvent.Reason.NO_CONFIG_FOUND;
            } else if (th instanceof ConfigException) {
                if (th.getCause() instanceof RequestCanceledException) {
                    if (((RequestCanceledException) th.getCause()).context().requestContext().request().cancellationReason() == CancellationReason.SHUTDOWN) {
                        reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
                    }
                } else if (th.getMessage().contains("NO_ACCESS")) {
                    reason = InitGlobalConfigFailedEvent.Reason.NO_ACCESS;
                }
            } else if (th instanceof AlreadyShutdownException) {
                reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
            }
            this.eventBus.publish(new InitGlobalConfigFailedEvent(reason.severity(), now.elapsed(), context(), reason, th));
        });
    }

    @Stability.Internal
    public void openBucket(String str) {
        this.eventBus.publish(new BucketOpenInitiatedEvent(this.coreContext, str));
        NanoTimestamp now = NanoTimestamp.now();
        this.configurationProvider.openBucket(str).subscribe(r1 -> {
        }, th -> {
            this.eventBus.publish(new BucketOpenFailedEvent(str, th instanceof AlreadyShutdownException ? Event.Severity.DEBUG : Event.Severity.WARN, now.elapsed(), this.coreContext, th));
        }, () -> {
            this.eventBus.publish(new BucketOpenedEvent(now.elapsed(), this.coreContext, str));
        });
    }

    @Stability.Internal
    public ClusterConfig clusterConfig() {
        return this.configurationProvider.config();
    }

    private Mono<Void> closeBucket(String str) {
        return Mono.defer(() -> {
            NanoTimestamp now = NanoTimestamp.now();
            return this.configurationProvider.closeBucket(str, !this.shutdown.get()).doOnSuccess(r10 -> {
                this.eventBus.publish(new BucketClosedEvent(now.elapsed(), this.coreContext, str));
            });
        });
    }

    @Stability.Internal
    public Mono<Void> ensureServiceAt(NodeIdentifier nodeIdentifier, ServiceType serviceType, int i, Optional<String> optional, Optional<String> optional2) {
        return this.shutdown.get() ? Mono.empty() : Flux.fromIterable(this.nodes).filter(node -> {
            return node.identifier().equals(nodeIdentifier);
        }).switchIfEmpty(Mono.defer(() -> {
            Node createNode = createNode(nodeIdentifier, optional2);
            this.nodes.add(createNode);
            return Mono.just(createNode);
        })).flatMap(node2 -> {
            return node2.addService(serviceType, i, optional);
        }).then();
    }

    @Stability.Internal
    public ValueRecorder responseMetric(Request<?> request, @Nullable Throwable th) {
        boolean z = this.coreContext.environment().meter() instanceof LoggingMeter;
        String str = null;
        if (!z) {
            if (th instanceof CompletionException) {
                str = th.getCause().getClass().getSimpleName().replace("Exception", "");
            } else if (th != null) {
                str = th.getClass().getSimpleName().replace("Exception", "");
            }
        }
        String str2 = str;
        return this.responseMetrics.computeIfAbsent(new ResponseMetricIdentifier(request, str), responseMetricIdentifier -> {
            HashMap hashMap = new HashMap(7);
            if (responseMetricIdentifier.serviceType != null) {
                hashMap.put(TracingIdentifiers.ATTR_SERVICE, responseMetricIdentifier.serviceType);
            } else if (request instanceof CoreTransactionRequest) {
                hashMap.put(TracingIdentifiers.ATTR_SERVICE, TracingIdentifiers.SERVICE_TRANSACTIONS);
            }
            hashMap.put(TracingIdentifiers.ATTR_OPERATION, responseMetricIdentifier.requestName);
            if (!z) {
                if (responseMetricIdentifier.bucketName != null) {
                    hashMap.put(TracingIdentifiers.ATTR_NAME, responseMetricIdentifier.bucketName);
                }
                if (responseMetricIdentifier.scopeName != null) {
                    hashMap.put(TracingIdentifiers.ATTR_SCOPE, responseMetricIdentifier.scopeName);
                }
                if (responseMetricIdentifier.collectionName != null) {
                    hashMap.put(TracingIdentifiers.ATTR_COLLECTION, responseMetricIdentifier.collectionName);
                }
                if (str2 != null) {
                    hashMap.put(TracingIdentifiers.ATTR_OUTCOME, str2);
                } else {
                    hashMap.put(TracingIdentifiers.ATTR_OUTCOME, "Success");
                }
            }
            return this.coreContext.environment().meter().valueRecorder(TracingIdentifiers.METER_OPERATIONS, hashMap);
        });
    }

    protected Node createNode(NodeIdentifier nodeIdentifier, Optional<String> optional) {
        return Node.create(this.coreContext, nodeIdentifier, optional);
    }

    private Mono<Void> maybeRemoveNode(Node node, ClusterConfig clusterConfig) {
        return Mono.defer(() -> {
            return ((clusterConfig.bucketConfigs().values().stream().flatMap(bucketConfig -> {
                return bucketConfig.nodes().stream();
            }).anyMatch(nodeInfo -> {
                return nodeInfo.identifier().equals(node.identifier());
            }) || (clusterConfig.globalConfig() != null ? clusterConfig.globalConfig().portInfos().stream().anyMatch(portInfo -> {
                return portInfo.identifier().equals(node.identifier());
            }) : false)) && node.hasServicesEnabled()) ? Mono.empty() : node.disconnect().doOnTerminate(() -> {
                this.nodes.remove(node);
            });
        });
    }

    private Mono<Void> removeServiceFrom(NodeIdentifier nodeIdentifier, ServiceType serviceType, Optional<String> optional) {
        return Flux.fromIterable(new ArrayList(this.nodes)).filter(node -> {
            return node.identifier().equals(nodeIdentifier);
        }).filter(node2 -> {
            return node2.serviceEnabled(serviceType);
        }).flatMap(node3 -> {
            return node3.removeService(serviceType, optional);
        }).then();
    }

    @Stability.Internal
    public Mono<Void> shutdown() {
        return shutdown(this.coreContext.environment().timeoutConfig().disconnectTimeout());
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public Mono<Void> shutdown(Duration duration) {
        return this.transactionsCleanup.shutdown(duration).then(Mono.defer(() -> {
            NanoTimestamp now = NanoTimestamp.now();
            if (!this.shutdown.compareAndSet(false, true)) {
                return Mono.empty();
            }
            this.eventBus.publish(new ShutdownInitiatedEvent(this.coreContext));
            this.invalidStateWatchdog.dispose();
            return Flux.fromIterable(this.currentConfig.bucketConfigs().keySet()).flatMap(this::closeBucket).then(this.configurationProvider.shutdown()).then(this.configurationProcessor.awaitTermination()).doOnTerminate(() -> {
                CoreLimiter.decrement();
                this.eventBus.publish(new ShutdownCompletedEvent(now.elapsed(), this.coreContext));
            }).then();
        })).timeout(duration, this.coreContext.environment().scheduler());
    }

    private void reconfigure(Runnable runnable) {
        ClusterConfig clusterConfig = this.currentConfig;
        if (clusterConfig.bucketConfigs().isEmpty() && clusterConfig.globalConfig() == null) {
            reconfigureDisconnectAll(runnable);
        } else {
            NanoTimestamp now = NanoTimestamp.now();
            reconfigureBuckets(Flux.just(clusterConfig).flatMap(clusterConfig2 -> {
                return Flux.fromIterable(clusterConfig2.bucketConfigs().values());
            })).then(reconfigureGlobal(clusterConfig.globalConfig())).then(Mono.defer(() -> {
                return Flux.fromIterable(new ArrayList(this.nodes)).flatMap(node -> {
                    return maybeRemoveNode(node, clusterConfig);
                }).then();
            })).subscribe(r1 -> {
            }, th -> {
                runnable.run();
                this.eventBus.publish(new ReconfigurationErrorDetectedEvent(context(), th));
            }, () -> {
                runnable.run();
                this.eventBus.publish(new ReconfigurationCompletedEvent(now.elapsed(), this.coreContext));
            });
        }
    }

    private void reconfigureDisconnectAll(Runnable runnable) {
        NanoTimestamp now = NanoTimestamp.now();
        Flux flatMap = Flux.fromIterable(new ArrayList(this.nodes)).flatMap((v0) -> {
            return v0.disconnect();
        });
        CopyOnWriteArrayList<Node> copyOnWriteArrayList = this.nodes;
        copyOnWriteArrayList.getClass();
        flatMap.doOnComplete(copyOnWriteArrayList::clear).subscribe(r1 -> {
        }, th -> {
            runnable.run();
            this.eventBus.publish(new ReconfigurationErrorDetectedEvent(context(), th));
        }, () -> {
            runnable.run();
            this.eventBus.publish(new ReconfigurationCompletedEvent(now.elapsed(), this.coreContext));
        });
    }

    private Mono<Void> reconfigureGlobal(GlobalConfig globalConfig) {
        return Mono.defer(() -> {
            return globalConfig == null ? Mono.empty() : Flux.fromIterable(globalConfig.portInfos()).flatMap(portInfo -> {
                boolean tlsEnabled = this.coreContext.environment().securityConfig().tlsEnabled();
                Map<ServiceType, Integer> map = null;
                Optional<String> alternateAddress = this.coreContext.alternateAddress();
                String str = null;
                if (alternateAddress.isPresent()) {
                    AlternateAddress alternateAddress2 = portInfo.alternateAddresses().get(alternateAddress.get());
                    str = alternateAddress2.hostname();
                    map = tlsEnabled ? alternateAddress2.sslServices() : alternateAddress2.services();
                }
                if (map == null || map.isEmpty()) {
                    map = tlsEnabled ? portInfo.sslPorts() : portInfo.ports();
                }
                String str2 = str;
                Map<ServiceType, Integer> map2 = map;
                return Flux.merge(new Publisher[]{Flux.fromIterable(map2.entrySet()).flatMap(entry -> {
                    return ensureServiceAt(portInfo.identifier(), (ServiceType) entry.getKey(), ((Integer) entry.getValue()).intValue(), Optional.empty(), Optional.ofNullable(str2)).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, portInfo.hostname(), (ServiceType) entry.getKey(), th));
                        return Mono.empty();
                    });
                }), Flux.fromArray(ServiceType.values()).filter(serviceType -> {
                    return !map2.containsKey(serviceType);
                }).flatMap(serviceType2 -> {
                    return removeServiceFrom(portInfo.identifier(), serviceType2, Optional.empty()).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, portInfo.hostname(), serviceType2, th));
                        return Mono.empty();
                    });
                })});
            }).then();
        });
    }

    private Mono<Void> reconfigureBuckets(Flux<BucketConfig> flux) {
        return flux.flatMap(bucketConfig -> {
            return Flux.fromIterable(bucketConfig.nodes()).flatMap(nodeInfo -> {
                boolean tlsEnabled = this.coreContext.environment().securityConfig().tlsEnabled();
                Map<ServiceType, Integer> map = null;
                Optional<String> alternateAddress = this.coreContext.alternateAddress();
                String str = null;
                if (alternateAddress.isPresent()) {
                    AlternateAddress alternateAddress2 = nodeInfo.alternateAddresses().get(alternateAddress.get());
                    str = alternateAddress2.hostname();
                    map = tlsEnabled ? alternateAddress2.sslServices() : alternateAddress2.services();
                }
                if (CbCollections.isNullOrEmpty(map)) {
                    map = tlsEnabled ? nodeInfo.sslServices() : nodeInfo.services();
                }
                String str2 = str;
                Map<ServiceType, Integer> map2 = map;
                return Flux.merge(new Publisher[]{Flux.fromIterable(map2.entrySet()).flatMap(entry -> {
                    return ensureServiceAt(nodeInfo.identifier(), (ServiceType) entry.getKey(), ((Integer) entry.getValue()).intValue(), ((ServiceType) entry.getKey()).scope() == ServiceScope.BUCKET ? Optional.of(bucketConfig.name()) : Optional.empty(), Optional.ofNullable(str2)).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, nodeInfo.hostname(), (ServiceType) entry.getKey(), th));
                        return Mono.empty();
                    });
                }), Flux.fromArray(ServiceType.values()).filter(serviceType -> {
                    return !map2.containsKey(serviceType);
                }).flatMap(serviceType2 -> {
                    return removeServiceFrom(nodeInfo.identifier(), serviceType2, serviceType2.scope() == ServiceScope.BUCKET ? Optional.of(bucketConfig.name()) : Optional.empty()).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, nodeInfo.hostname(), serviceType2, th));
                        return Mono.empty();
                    });
                })});
            });
        }).then();
    }

    private static Locator locator(ServiceType serviceType) {
        switch (serviceType) {
            case KV:
                return KEY_VALUE_LOCATOR;
            case MANAGER:
                return MANAGER_LOCATOR;
            case QUERY:
                return QUERY_LOCATOR;
            case ANALYTICS:
                return ANALYTICS_LOCATOR;
            case SEARCH:
                return SEARCH_LOCATOR;
            case VIEWS:
                return VIEWS_LOCATOR;
            case EVENTING:
                return EVENTING_LOCATOR;
            case BACKUP:
                return BACKUP_LOCATOR;
            default:
                throw new IllegalStateException("Unsupported ServiceType: " + serviceType);
        }
    }

    @Stability.Internal
    public CoreTransactionsCleanup transactionsCleanup() {
        return this.transactionsCleanup;
    }

    @Stability.Internal
    public CoreTransactionsContext transactionsContext() {
        return this.transactionsContext;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        shutdown().block();
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreKvOps kvOps(CoreKeyspace coreKeyspace) {
        return new ClassicCoreKvOps(this, coreKeyspace);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreQueryOps queryOps() {
        return new ClassicCoreQueryOps(this);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreSearchOps searchOps(@Nullable CoreBucketAndScope coreBucketAndScope) {
        return new ClassicCoreSearchOps(this, coreBucketAndScope);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreKvBinaryOps kvBinaryOps(CoreKeyspace coreKeyspace) {
        return new ClassicCoreKvBinaryOps(this, coreKeyspace);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreBucketManagerOps bucketManager() {
        return new ClassicCoreBucketManager(this);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    @Stability.Internal
    public CoreCollectionManager collectionManager(String str) {
        return new ClassicCoreCollectionManagerOps(this, str);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    public CoreSearchIndexManager clusterSearchIndexManager() {
        return new ClassicCoreClusterSearchIndexManager(this);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    public CoreSearchIndexManager scopeSearchIndexManager(CoreBucketAndScope coreBucketAndScope) {
        return new ClassicCoreScopeSearchIndexManager(this, coreBucketAndScope);
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    public CoreEnvironment environment() {
        return context().environment();
    }

    @Override // com.couchbase.client.core.api.CoreCouchbaseOps
    public CompletableFuture<Void> waitUntilReady(Set<ServiceType> set, Duration duration, ClusterState clusterState, @Nullable String str) {
        return WaitUntilReadyHelper.waitUntilReady(this, set, duration, clusterState, Optional.ofNullable(str));
    }
}
