package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationException;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.kv.GetBucketConfigRequest;
import com.couchbase.client.core.message.kv.GetBucketConfigResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.utils.Buffers;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/core/config/refresher/CarrierRefresher.class */
public class CarrierRefresher extends AbstractRefresher {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) CarrierRefresher.class);
    private final Set<String> subscriptions;
    private final CoreEnvironment environment;

    public CarrierRefresher(CoreEnvironment coreEnvironment, ClusterFacade clusterFacade) {
        super(coreEnvironment, clusterFacade);
        this.subscriptions = Collections.newSetFromMap(new ConcurrentHashMap());
        this.environment = coreEnvironment;
        Observable.interval(10L, TimeUnit.SECONDS, coreEnvironment.scheduler()).subscribe(new Action1<Long>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.1
            public void call(Long l) {
                CarrierRefresher.this.provider().signalOutdated();
            }
        });
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> shutdown() {
        return Observable.just(true);
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markTainted(BucketConfig bucketConfig) {
        final String name = bucketConfig.name();
        if (this.subscriptions.contains(name)) {
            return;
        }
        LOGGER.debug("Config for bucket \"" + name + "\" marked as tainted, starting polling.");
        this.subscriptions.add(name);
        Observable takeWhile = Observable.interval(1L, TimeUnit.SECONDS).takeWhile(new Func1<Long, Boolean>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.2
            public Boolean call(Long l) {
                return Boolean.valueOf(CarrierRefresher.this.subscriptions.contains(name));
            }
        });
        Observable observable = null;
        ArrayList<NodeInfo> arrayList = new ArrayList(bucketConfig.nodes());
        Collections.shuffle(arrayList);
        for (final NodeInfo nodeInfo : arrayList) {
            if (isValidCarrierNode(this.environment.sslEnabled(), nodeInfo)) {
                observable = observable == null ? takeWhile.flatMap(new Func1<Long, Observable<String>>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.3
                    public Observable<String> call(Long l) {
                        return CarrierRefresher.this.refreshAgainstNode(name, nodeInfo.hostname());
                    }
                }) : observable.onErrorResumeNext(refreshAgainstNode(name, nodeInfo.hostname()));
            }
        }
        if (observable == null) {
            LOGGER.debug("Cannot poll bucket, because node list contains no nodes.");
        } else {
            observable.subscribe(new Subscriber<String>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.4
                public void onCompleted() {
                    CarrierRefresher.LOGGER.debug("Completed polling for bucket \"{}\".", name);
                }

                public void onError(Throwable th) {
                    CarrierRefresher.LOGGER.debug("Error while polling bucket config, ignoring.", th);
                }

                public void onNext(String str) {
                    if (str.startsWith("{")) {
                        CarrierRefresher.this.provider().proposeBucketConfig(name, str);
                    }
                }
            });
        }
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markUntainted(BucketConfig bucketConfig) {
        if (this.subscriptions.contains(bucketConfig.name())) {
            LOGGER.debug("Config for bucket \"" + bucketConfig.name() + "\" marked as untainted, stopping polling.");
            this.subscriptions.remove(bucketConfig.name());
        }
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void refresh(ClusterConfig clusterConfig) {
        Observable.from(clusterConfig.bucketConfigs().values()).observeOn(this.environment.scheduler()).filter(new Func1<BucketConfig, Boolean>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.6
            public Boolean call(BucketConfig bucketConfig) {
                return Boolean.valueOf(CarrierRefresher.this.registrations().containsKey(bucketConfig.name()));
            }
        }).subscribe(new Action1<BucketConfig>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.5
            public void call(final BucketConfig bucketConfig) {
                final String name = bucketConfig.name();
                Observable observable = null;
                ArrayList<NodeInfo> arrayList = new ArrayList(bucketConfig.nodes());
                Collections.shuffle(arrayList);
                for (NodeInfo nodeInfo : arrayList) {
                    if (CarrierRefresher.isValidCarrierNode(CarrierRefresher.this.environment.sslEnabled(), nodeInfo)) {
                        observable = observable == null ? CarrierRefresher.this.refreshAgainstNode(name, nodeInfo.hostname()) : observable.onErrorResumeNext(CarrierRefresher.this.refreshAgainstNode(name, nodeInfo.hostname()));
                    }
                }
                if (observable == null) {
                    CarrierRefresher.LOGGER.debug("No node registered in the current configuration, skipping to refresh.");
                } else {
                    observable.subscribe(new Subscriber<String>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.5.1
                        public void onCompleted() {
                            CarrierRefresher.LOGGER.debug("Completed refreshing config for bucket \"{}\"", name);
                        }

                        public void onError(Throwable th) {
                            CarrierRefresher.LOGGER.debug("Error while refreshing bucket config, ignoring.", th);
                        }

                        public void onNext(String str) {
                            if (str.startsWith("{")) {
                                CarrierRefresher.this.provider().proposeBucketConfig(bucketConfig.name(), str);
                            }
                        }
                    });
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isValidCarrierNode(boolean z, NodeInfo nodeInfo) {
        return (z && nodeInfo.sslServices().containsKey(ServiceType.BINARY)) || nodeInfo.services().containsKey(ServiceType.BINARY);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<String> refreshAgainstNode(final String str, final InetAddress inetAddress) {
        return Buffers.wrapColdWithAutoRelease(Observable.defer(new Func0<Observable<GetBucketConfigResponse>>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.10
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Observable<GetBucketConfigResponse> m14call() {
                return CarrierRefresher.this.cluster().send(new GetBucketConfigRequest(str, inetAddress));
            }
        })).doOnNext(new Action1<GetBucketConfigResponse>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.9
            public void call(GetBucketConfigResponse getBucketConfigResponse) {
                if (getBucketConfigResponse.status().isSuccess()) {
                    return;
                }
                if (getBucketConfigResponse.content() != null && getBucketConfigResponse.content().refCnt() > 0) {
                    getBucketConfigResponse.content().release();
                }
                throw new ConfigurationException("Could not fetch config from node: " + getBucketConfigResponse);
            }
        }).map(new Func1<GetBucketConfigResponse, String>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.8
            public String call(GetBucketConfigResponse getBucketConfigResponse) {
                String trim = getBucketConfigResponse.content().toString(CharsetUtil.UTF_8).trim();
                if (getBucketConfigResponse.content().refCnt() > 0) {
                    getBucketConfigResponse.content().release();
                }
                return trim.replace("$HOST", getBucketConfigResponse.hostname().getHostName());
            }
        }).doOnError(new Action1<Throwable>() { // from class: com.couchbase.client.core.config.refresher.CarrierRefresher.7
            public void call(Throwable th) {
                CarrierRefresher.LOGGER.debug("Could not fetch config from bucket \"" + str + "\" against \"" + inetAddress + "\".", th);
            }
        });
    }
}
