package com.netflix.eureka2.client.interest;

import com.netflix.eureka2.channel.ChannelFactory;
import com.netflix.eureka2.channel.InterestChannel;
import com.netflix.eureka2.connection.RetryableConnection;
import com.netflix.eureka2.connection.RetryableConnectionFactory;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.EmptyRegistryInterest;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.MultipleInterests;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import com.netflix.eureka2.registry.instance.InstanceInfo;
import javax.inject.Inject;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func2;

/* loaded from: input_file:com/netflix/eureka2/client/interest/EurekaInterestClientImpl.class */
public class EurekaInterestClientImpl extends AbstractInterestClient {
    private final InterestTracker interestTracker;
    private final RetryableConnection<InterestChannel> retryableConnection;

    @Inject
    public EurekaInterestClientImpl(SourcedEurekaRegistry<InstanceInfo> sourcedEurekaRegistry, ChannelFactory<InterestChannel> channelFactory) {
        this(sourcedEurekaRegistry, channelFactory, AbstractInterestClient.DEFAULT_RETRY_WAIT_MILLIS);
    }

    EurekaInterestClientImpl(SourcedEurekaRegistry<InstanceInfo> sourcedEurekaRegistry, ChannelFactory<InterestChannel> channelFactory, int i) {
        super(sourcedEurekaRegistry, i);
        this.interestTracker = new InterestTracker();
        this.retryableConnection = new RetryableConnectionFactory(channelFactory).singleOpConnection(this.interestTracker.interestChangeStream(), new Func2<InterestChannel, Interest<InstanceInfo>, Observable<Void>>() { // from class: com.netflix.eureka2.client.interest.EurekaInterestClientImpl.1
            public Observable<Void> call(InterestChannel interestChannel, Interest<InstanceInfo> interest) {
                return interestChannel.change(interest);
            }
        });
        registryEvictionSubscribe(this.retryableConnection);
        lifecycleSubscribe(this.retryableConnection);
    }

    @Override // com.netflix.eureka2.client.EurekaInterestClient
    public Observable<ChangeNotification<InstanceInfo>> forInterest(final Interest<InstanceInfo> interest) {
        return this.isShutdown.get() ? Observable.error(new IllegalStateException("InterestHandler has shutdown")) : interest instanceof EmptyRegistryInterest ? Observable.empty() : ((interest instanceof MultipleInterests) && ((MultipleInterests) interest).flatten().isEmpty()) ? Observable.empty() : Observable.create(new Observable.OnSubscribe<Void>() { // from class: com.netflix.eureka2.client.interest.EurekaInterestClientImpl.2
            public void call(Subscriber<? super Void> subscriber) {
                EurekaInterestClientImpl.this.interestTracker.appendInterest(interest);
                subscriber.onCompleted();
            }
        }).cast(ChangeNotification.class).mergeWith(this.registry.forInterest(interest)).doOnUnsubscribe(new Action0() { // from class: com.netflix.eureka2.client.interest.EurekaInterestClientImpl.3
            public void call() {
                EurekaInterestClientImpl.this.interestTracker.removeInterest(interest);
            }
        });
    }

    @Override // com.netflix.eureka2.client.interest.AbstractInterestClient
    protected RetryableConnection<InterestChannel> getRetryableConnection() {
        return this.retryableConnection;
    }
}
