package com.couchbase.client.core;

import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.BootstrapMessage;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.config.ConfigRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.RemoveServiceRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.message.kv.BinaryRequest;
import com.couchbase.client.core.message.query.QueryRequest;
import com.couchbase.client.core.message.view.ViewRequest;
import com.couchbase.client.core.node.CouchbaseNode;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.locate.ConfigLocator;
import com.couchbase.client.core.node.locate.DCPLocator;
import com.couchbase.client.core.node.locate.KeyValueLocator;
import com.couchbase.client.core.node.locate.Locator;
import com.couchbase.client.core.node.locate.QueryLocator;
import com.couchbase.client.core.node.locate.ViewLocator;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/core/RequestHandler.class */
public class RequestHandler implements EventHandler<RequestEvent> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) RequestHandler.class);
    private static final int INITIAL_NODE_SIZE = 128;
    private final Locator binaryLocator;
    private final Locator viewLocator;
    private final Locator queryLocator;
    private final Locator configLocator;
    private final Locator dcpLocator;
    private final Set<Node> nodes;
    private final CoreEnvironment environment;
    private final AtomicReference<ClusterConfig> configuration;
    private final RingBuffer<ResponseEvent> responseBuffer;

    public RequestHandler(CoreEnvironment coreEnvironment, Observable<ClusterConfig> observable, RingBuffer<ResponseEvent> ringBuffer) {
        this(new CopyOnWriteArraySet(), coreEnvironment, observable, ringBuffer);
    }

    RequestHandler(Set<Node> set, CoreEnvironment coreEnvironment, Observable<ClusterConfig> observable, RingBuffer<ResponseEvent> ringBuffer) {
        this.binaryLocator = new KeyValueLocator();
        this.viewLocator = new ViewLocator();
        this.queryLocator = new QueryLocator();
        this.configLocator = new ConfigLocator();
        this.dcpLocator = new DCPLocator();
        this.nodes = set;
        this.environment = coreEnvironment;
        this.responseBuffer = ringBuffer;
        this.configuration = new AtomicReference<>();
        observable.subscribe(new Action1<ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.1
            public void call(ClusterConfig clusterConfig) {
                try {
                    RequestHandler.LOGGER.debug("Got notified of a new configuration arriving.");
                    RequestHandler.this.configuration.set(clusterConfig);
                    RequestHandler.this.reconfigure(clusterConfig).subscribe();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override // com.couchbase.client.deps.com.lmax.disruptor.EventHandler
    public void onEvent(RequestEvent requestEvent, long j, boolean z) throws Exception {
        try {
            CouchbaseRequest request = requestEvent.getRequest();
            ClusterConfig clusterConfig = this.configuration.get();
            if (!(request instanceof BootstrapMessage)) {
                if (clusterConfig == null || !(request.bucket() == null || clusterConfig.hasBucket(request.bucket()))) {
                    request.observable().onError(new BucketClosedException(request.bucket() + " has been closed"));
                    requestEvent.setRequest(null);
                    return;
                } else {
                    try {
                        checkFeaturesForRequest(request, clusterConfig.bucketConfig(request.bucket()));
                    } catch (UnsupportedOperationException e) {
                        request.observable().onError(e);
                        requestEvent.setRequest(null);
                        return;
                    }
                }
            }
            Node[] locate = locator(request).locate(request, this.nodes, clusterConfig);
            if (locate == null) {
                requestEvent.setRequest(null);
                return;
            }
            if (locate.length == 0) {
                RetryHelper.retryOrCancel(this.environment, request, this.responseBuffer);
            }
            for (Node node : locate) {
                try {
                    node.send(request);
                } catch (Exception e2) {
                    request.observable().onError(e2);
                }
            }
            if (z) {
                Iterator<Node> it = this.nodes.iterator();
                while (it.hasNext()) {
                    it.next().send(SignalFlush.INSTANCE);
                }
            }
            requestEvent.setRequest(null);
        } catch (Throwable th) {
            requestEvent.setRequest(null);
            throw th;
        }
    }

    protected void checkFeaturesForRequest(CouchbaseRequest couchbaseRequest, BucketConfig bucketConfig) {
        if ((couchbaseRequest instanceof BinaryRequest) && !bucketConfig.serviceEnabled(ServiceType.BINARY)) {
            throw new ServiceNotAvailableException("The KeyValue service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof ViewRequest) && !bucketConfig.serviceEnabled(ServiceType.VIEW)) {
            throw new ServiceNotAvailableException("The View service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof QueryRequest) && !this.environment.queryEnabled() && !bucketConfig.serviceEnabled(ServiceType.QUERY)) {
            throw new ServiceNotAvailableException("The Query service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof DCPRequest) && !this.environment.dcpEnabled() && !bucketConfig.serviceEnabled(ServiceType.DCP)) {
            throw new ServiceNotAvailableException("The DCP service is not enabled or no node in the cluster supports it.");
        }
    }

    public Observable<LifecycleState> addNode(InetAddress inetAddress) {
        Node nodeBy = nodeBy(inetAddress);
        if (nodeBy == null) {
            return addNode(new CouchbaseNode(inetAddress, this.environment, this.responseBuffer));
        }
        LOGGER.debug("Node {} already registered, skipping.", inetAddress);
        return Observable.just(nodeBy.state());
    }

    Observable<LifecycleState> addNode(final Node node) {
        LOGGER.debug("Got instructed to add Node {}", node.hostname());
        if (this.nodes.contains(node)) {
            LOGGER.debug("Node {} already registered, skipping.", node.hostname());
            return Observable.just(node.state());
        }
        LOGGER.debug("Connecting Node " + node.hostname());
        return node.connect().map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.RequestHandler.2
            public LifecycleState call(LifecycleState lifecycleState) {
                RequestHandler.LOGGER.debug("Connect finished, registering for use.");
                RequestHandler.this.nodes.add(node);
                return lifecycleState;
            }
        });
    }

    public Observable<LifecycleState> removeNode(InetAddress inetAddress) {
        return removeNode(nodeBy(inetAddress));
    }

    Observable<LifecycleState> removeNode(Node node) {
        LOGGER.debug("Got instructed to remove Node {}", node.hostname());
        this.nodes.remove(node);
        return node.disconnect();
    }

    public Observable<Service> addService(AddServiceRequest addServiceRequest) {
        LOGGER.debug("Got instructed to add Service {}, to Node {}", addServiceRequest.type(), addServiceRequest.hostname());
        return nodeBy(addServiceRequest.hostname()).addService(addServiceRequest);
    }

    public Observable<Service> removeService(RemoveServiceRequest removeServiceRequest) {
        LOGGER.debug("Got instructed to remove Service {}, from Node {}", removeServiceRequest.type(), removeServiceRequest.hostname());
        return nodeBy(removeServiceRequest.hostname()).removeService(removeServiceRequest);
    }

    public Node nodeBy(InetAddress inetAddress) {
        if (inetAddress == null) {
            return null;
        }
        for (Node node : this.nodes) {
            if (node.hostname().equals(inetAddress)) {
                return node;
            }
        }
        return null;
    }

    protected Locator locator(CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof BinaryRequest) {
            return this.binaryLocator;
        }
        if (couchbaseRequest instanceof ViewRequest) {
            return this.viewLocator;
        }
        if (couchbaseRequest instanceof QueryRequest) {
            return this.queryLocator;
        }
        if (couchbaseRequest instanceof ConfigRequest) {
            return this.configLocator;
        }
        if (couchbaseRequest instanceof DCPRequest) {
            return this.dcpLocator;
        }
        throw new IllegalArgumentException("Unknown Request Type: " + couchbaseRequest);
    }

    public Observable<ClusterConfig> reconfigure(final ClusterConfig clusterConfig) {
        LOGGER.debug("Starting reconfiguration.");
        if (!clusterConfig.bucketConfigs().values().isEmpty()) {
            return Observable.just(clusterConfig).flatMap(new Func1<ClusterConfig, Observable<BucketConfig>>() { // from class: com.couchbase.client.core.RequestHandler.8
                public Observable<BucketConfig> call(ClusterConfig clusterConfig2) {
                    return Observable.from(clusterConfig2.bucketConfigs().values());
                }
            }).flatMap(new Func1<BucketConfig, Observable<Boolean>>() { // from class: com.couchbase.client.core.RequestHandler.7
                public Observable<Boolean> call(BucketConfig bucketConfig) {
                    return RequestHandler.this.reconfigureBucket(bucketConfig);
                }
            }).last().doOnNext(new Action1<Boolean>() { // from class: com.couchbase.client.core.RequestHandler.6
                public void call(Boolean bool) {
                    HashSet hashSet = new HashSet();
                    Iterator<Map.Entry<String, BucketConfig>> it = clusterConfig.bucketConfigs().entrySet().iterator();
                    while (it.hasNext()) {
                        Iterator<NodeInfo> it2 = it.next().getValue().nodes().iterator();
                        while (it2.hasNext()) {
                            hashSet.add(it2.next().hostname());
                        }
                    }
                    for (Node node : RequestHandler.this.nodes) {
                        if (!hashSet.contains(node.hostname())) {
                            RequestHandler.LOGGER.debug("Removing and disconnecting node {}.", node.hostname());
                            RequestHandler.this.removeNode(node);
                            node.disconnect().subscribe();
                        }
                    }
                }
            }).map(new Func1<Boolean, ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.5
                public ClusterConfig call(Boolean bool) {
                    return clusterConfig;
                }
            });
        }
        LOGGER.debug("No node found in config, disconnecting all nodes.");
        return this.nodes.isEmpty() ? Observable.just(clusterConfig) : Observable.from(this.nodes).doOnNext(new Action1<Node>() { // from class: com.couchbase.client.core.RequestHandler.4
            public void call(Node node) {
                RequestHandler.this.removeNode(node);
                node.disconnect().subscribe();
            }
        }).last().map(new Func1<Node, ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.3
            public ClusterConfig call(Node node) {
                return clusterConfig;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Boolean> reconfigureBucket(final BucketConfig bucketConfig) {
        LOGGER.debug("Starting reconfiguration for bucket {}", bucketConfig.name());
        ArrayList arrayList = new ArrayList();
        for (final NodeInfo nodeInfo : bucketConfig.nodes()) {
            arrayList.add(addNode(nodeInfo.hostname()).flatMap(new Func1<LifecycleState, Observable<Map<ServiceType, Integer>>>() { // from class: com.couchbase.client.core.RequestHandler.12
                public Observable<Map<ServiceType, Integer>> call(LifecycleState lifecycleState) {
                    Map sslServices = RequestHandler.this.environment.sslEnabled() ? nodeInfo.sslServices() : nodeInfo.services();
                    if (!sslServices.containsKey(ServiceType.QUERY) && RequestHandler.this.environment.queryEnabled()) {
                        sslServices.put(ServiceType.QUERY, Integer.valueOf(RequestHandler.this.environment.queryPort()));
                    }
                    if (!sslServices.containsKey(ServiceType.DCP) && RequestHandler.this.environment.dcpEnabled()) {
                        sslServices.put(ServiceType.DCP, sslServices.get(ServiceType.BINARY));
                    }
                    return Observable.just(sslServices);
                }
            }).flatMap(new Func1<Map<ServiceType, Integer>, Observable<AddServiceRequest>>() { // from class: com.couchbase.client.core.RequestHandler.11
                public Observable<AddServiceRequest> call(Map<ServiceType, Integer> map) {
                    ArrayList arrayList2 = new ArrayList(map.size());
                    for (Map.Entry<ServiceType, Integer> entry : map.entrySet()) {
                        arrayList2.add(new AddServiceRequest(entry.getKey(), bucketConfig.name(), bucketConfig.password(), entry.getValue().intValue(), nodeInfo.hostname()));
                    }
                    return Observable.from(arrayList2);
                }
            }).flatMap(new Func1<AddServiceRequest, Observable<Service>>() { // from class: com.couchbase.client.core.RequestHandler.10
                public Observable<Service> call(AddServiceRequest addServiceRequest) {
                    return RequestHandler.this.addService(addServiceRequest);
                }
            }).last().map(new Func1<Service, Boolean>() { // from class: com.couchbase.client.core.RequestHandler.9
                public Boolean call(Service service) {
                    return true;
                }
            }));
        }
        return Observable.merge(arrayList).last();
    }
}
