package netflix.ocelli.loadbalancer;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import netflix.ocelli.ClientConnector;
import netflix.ocelli.FailureDetectorFactory;
import netflix.ocelli.LoadBalancer;
import netflix.ocelli.LoadBalancers;
import netflix.ocelli.MembershipEvent;
import netflix.ocelli.PartitionedLoadBalancer;
import netflix.ocelli.SelectionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;

/* loaded from: input_file:netflix/ocelli/loadbalancer/DefaultPartitioningLoadBalancer.class */
public class DefaultPartitioningLoadBalancer<C, K> implements PartitionedLoadBalancer<C, K> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPartitioningLoadBalancer.class);
    private final Func1<C, Observable<K>> partitioner;
    private final Observable<MembershipEvent<C>> hostSource;
    private final FailureDetectorFactory<C> failureDetector;
    private final ClientConnector<C> clientConnector;
    private final Func1<Integer, Integer> connectedHostCountStrategy;
    private final Func1<Integer, Long> quaratineDelayStrategy;
    private final SelectionStrategy<C> selectionStrategy;
    private final String name;
    private final CompositeSubscription cs = new CompositeSubscription();
    private final ConcurrentMap<K, DefaultPartitioningLoadBalancer<C, K>.Holder> partitions = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:netflix/ocelli/loadbalancer/DefaultPartitioningLoadBalancer$Holder.class */
    public final class Holder {
        final PublishSubject<MembershipEvent<C>> hostStream;
        final LoadBalancer<C> loadBalancer;

        public Holder(LoadBalancer<C> loadBalancer, PublishSubject<MembershipEvent<C>> publishSubject) {
            this.loadBalancer = loadBalancer;
            this.hostStream = publishSubject;
        }
    }

    public DefaultPartitioningLoadBalancer(String str, Observable<MembershipEvent<C>> observable, ClientConnector<C> clientConnector, FailureDetectorFactory<C> failureDetectorFactory, SelectionStrategy<C> selectionStrategy, Func1<Integer, Long> func1, Func1<Integer, Integer> func12, Func1<C, Observable<K>> func13) {
        this.partitioner = func13;
        this.hostSource = observable;
        this.failureDetector = failureDetectorFactory;
        this.clientConnector = clientConnector;
        this.selectionStrategy = selectionStrategy;
        this.quaratineDelayStrategy = func1;
        this.name = str;
        this.connectedHostCountStrategy = func12;
        initialize();
    }

    private void initialize() {
        this.cs.add(this.hostSource.subscribe(new Action1<MembershipEvent<C>>() { // from class: netflix.ocelli.loadbalancer.DefaultPartitioningLoadBalancer.1
            public void call(final MembershipEvent<C> membershipEvent) {
                ((Observable) DefaultPartitioningLoadBalancer.this.partitioner.call(membershipEvent.getClient())).subscribe(new Action1<K>() { // from class: netflix.ocelli.loadbalancer.DefaultPartitioningLoadBalancer.1.1
                    public void call(K k) {
                        DefaultPartitioningLoadBalancer.this.getOrCreateHolder(k).hostStream.onNext(membershipEvent);
                    }
                });
            }
        }));
    }

    @Override // netflix.ocelli.PartitionedLoadBalancer
    public void shutdown() {
        this.cs.unsubscribe();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DefaultPartitioningLoadBalancer<C, K>.Holder getOrCreateHolder(K k) {
        DefaultPartitioningLoadBalancer<C, K>.Holder holder = this.partitions.get(k);
        if (null == holder) {
            PublishSubject create = PublishSubject.create();
            DefaultPartitioningLoadBalancer<C, K>.Holder holder2 = new Holder(createPartition(k, create), create);
            holder = this.partitions.putIfAbsent(k, holder2);
            if (holder == null) {
                holder = holder2;
            }
        }
        return holder;
    }

    @Override // netflix.ocelli.PartitionedLoadBalancer
    public LoadBalancer<C> get(K k) {
        return getOrCreateHolder(k).loadBalancer;
    }

    @Override // netflix.ocelli.PartitionedLoadBalancer
    public Observable<K> listKeys() {
        return Observable.from(this.partitions.keySet());
    }

    private LoadBalancer<C> createPartition(K k, Observable<MembershipEvent<C>> observable) {
        LOG.info("Creating partition : " + k);
        return LoadBalancers.newBuilder(observable).withName(getName() + "_" + k).withQuarantineStrategy(this.quaratineDelayStrategy).withSelectionStrategy(this.selectionStrategy).withActiveClientCountStrategy(this.connectedHostCountStrategy).withClientConnector(this.clientConnector).withFailureDetector(this.failureDetector).build();
    }

    private String getName() {
        return this.name;
    }
}
