package com.couchbase.client.core.service;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/core/service/AbstractDynamicService.class */
public abstract class AbstractDynamicService extends AbstractStateMachine<LifecycleState> implements Service {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) Service.class);
    private final Endpoint[] endpoints;
    private final String hostname;
    private final String bucket;
    private final String username;
    private final String password;
    private final int port;
    private final CoreEnvironment env;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final int minEndpoints;
    private final Service.EndpointFactory endpointFactory;
    private final EndpointStateZipper endpointStates;
    private final LifecycleState initialState;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractDynamicService(String str, String str2, String str3, String str4, int i, CoreEnvironment coreEnvironment, int i2, RingBuffer<ResponseEvent> ringBuffer, Service.EndpointFactory endpointFactory) {
        super(i2 == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED);
        this.initialState = i2 == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED;
        this.hostname = str;
        this.bucket = str2;
        this.username = str3;
        this.password = str4;
        this.port = i;
        this.env = coreEnvironment;
        this.minEndpoints = i2;
        this.responseBuffer = ringBuffer;
        this.endpointFactory = endpointFactory;
        this.endpointStates = new EndpointStateZipper(this.initialState);
        this.endpoints = new Endpoint[i2];
        this.endpointStates.states().subscribe(new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.1
            public void call(LifecycleState lifecycleState) {
                AbstractDynamicService.this.transitionState(lifecycleState);
            }
        });
    }

    protected abstract void dispatch(CouchbaseRequest couchbaseRequest);

    @Override // com.couchbase.client.core.service.Service
    public Observable<LifecycleState> connect() {
        LOGGER.debug(logIdent(this.hostname, this) + "Got instructed to connect.");
        if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
            LOGGER.debug(logIdent(this.hostname, this) + "Already connected or connecting, skipping connect.");
            return Observable.just(state());
        }
        for (int i = 0; i < this.minEndpoints; i++) {
            Endpoint createEndpoint = createEndpoint();
            this.endpoints[i] = createEndpoint;
            this.endpointStates.register(createEndpoint, createEndpoint);
        }
        return Observable.from(this.endpoints).flatMap(new Func1<Endpoint, Observable<LifecycleState>>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.3
            public Observable<LifecycleState> call(Endpoint endpoint) {
                AbstractDynamicService.LOGGER.debug(AbstractDynamicService.logIdent(AbstractDynamicService.this.hostname, AbstractDynamicService.this) + "Initializing connect on Endpoint.");
                return endpoint.connect();
            }
        }).lastOrDefault(this.initialState).map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.2
            public LifecycleState call(LifecycleState lifecycleState) {
                return AbstractDynamicService.this.state();
            }
        });
    }

    @Override // com.couchbase.client.core.service.Service
    public void send(CouchbaseRequest couchbaseRequest) {
        if (!(couchbaseRequest instanceof SignalFlush)) {
            dispatch(couchbaseRequest);
            return;
        }
        int length = this.endpoints.length;
        for (int i = 0; i < length; i++) {
            Endpoint endpoint = this.endpoints[i];
            if (endpoint != null) {
                endpoint.send(couchbaseRequest);
            }
        }
    }

    @Override // com.couchbase.client.core.service.Service
    public Observable<LifecycleState> disconnect() {
        LOGGER.debug(logIdent(this.hostname, this) + "Got instructed to disconnect.");
        if (state() != LifecycleState.DISCONNECTED && state() != LifecycleState.DISCONNECTING) {
            return Observable.from(this.endpoints).flatMap(new Func1<Endpoint, Observable<LifecycleState>>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.5
                public Observable<LifecycleState> call(Endpoint endpoint) {
                    AbstractDynamicService.LOGGER.debug(AbstractDynamicService.logIdent(AbstractDynamicService.this.hostname, AbstractDynamicService.this) + "Initializing disconnect on Endpoint.");
                    return endpoint.disconnect();
                }
            }).lastOrDefault(this.initialState).map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.4
                public LifecycleState call(LifecycleState lifecycleState) {
                    AbstractDynamicService.this.endpointStates.terminate();
                    return AbstractDynamicService.this.state();
                }
            });
        }
        LOGGER.debug(logIdent(this.hostname, this) + "Already disconnected or disconnecting, skipping disconnect.");
        return Observable.just(state());
    }

    @Override // com.couchbase.client.core.service.Service
    public BucketServiceMapping mapping() {
        return type().mapping();
    }

    @Override // com.couchbase.client.core.service.Service
    public Observable<EndpointHealth> healthCheck() {
        ArrayList arrayList = new ArrayList();
        Iterator<Endpoint> it = endpoints().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().healthCheck(type()).toObservable());
        }
        return Observable.merge(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Endpoint createEndpoint() {
        return this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.env, this.responseBuffer);
    }

    protected static String logIdent(String str, Service service) {
        return "[" + str + "][" + service.getClass().getSimpleName() + "]: ";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Endpoint> endpoints() {
        return Arrays.asList(this.endpoints);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EndpointStateZipper endpointStates() {
        return this.endpointStates;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void whenState(Endpoint endpoint, final LifecycleState lifecycleState, Action1<LifecycleState> action1) {
        endpoint.states().filter(new Func1<LifecycleState, Boolean>() { // from class: com.couchbase.client.core.service.AbstractDynamicService.6
            public Boolean call(LifecycleState lifecycleState2) {
                return Boolean.valueOf(lifecycleState2 == LifecycleState.this);
            }
        }).take(1).subscribe(action1);
    }
}
