/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.interest;

import com.netflix.eureka2.channel.ChannelFactory;
import com.netflix.eureka2.channel.InterestChannel;
import com.netflix.eureka2.client.interest.AbstractInterestClient;
import com.netflix.eureka2.client.interest.InterestTracker;
import com.netflix.eureka2.connection.RetryableConnection;
import com.netflix.eureka2.connection.RetryableConnectionFactory;
import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.EmptyRegistryInterest;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.MultipleInterests;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import com.netflix.eureka2.registry.instance.InstanceInfo;
import javax.inject.Inject;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func2;

public class EurekaInterestClientImpl
extends AbstractInterestClient {
    private final InterestTracker interestTracker = new InterestTracker();
    private final RetryableConnection<InterestChannel> retryableConnection;

    @Inject
    public EurekaInterestClientImpl(SourcedEurekaRegistry<InstanceInfo> registry, ChannelFactory<InterestChannel> channelFactory) {
        this(registry, channelFactory, 1000);
    }

    EurekaInterestClientImpl(SourcedEurekaRegistry<InstanceInfo> registry, ChannelFactory<InterestChannel> channelFactory, int retryWaitMillis) {
        super(registry, retryWaitMillis);
        RetryableConnectionFactory<InterestChannel> retryableConnectionFactory = new RetryableConnectionFactory<InterestChannel>(channelFactory);
        Observable<Interest<InstanceInfo>> opStream = this.interestTracker.interestChangeStream();
        Func2<InterestChannel, Interest<InstanceInfo>, Observable<Void>> executeOnChannel = new Func2<InterestChannel, Interest<InstanceInfo>, Observable<Void>>(){

            public Observable<Void> call(InterestChannel interestChannel, Interest<InstanceInfo> interest) {
                return interestChannel.change(interest);
            }
        };
        this.retryableConnection = retryableConnectionFactory.singleOpConnection(opStream, executeOnChannel);
        this.registryEvictionSubscribe(this.retryableConnection);
        this.lifecycleSubscribe(this.retryableConnection);
    }

    @Override
    public Observable<ChangeNotification<InstanceInfo>> forInterest(final Interest<InstanceInfo> interest) {
        MultipleInterests multiple;
        if (this.isShutdown.get()) {
            return Observable.error((Throwable)new IllegalStateException("InterestHandler has shutdown"));
        }
        if (interest instanceof EmptyRegistryInterest) {
            return Observable.empty();
        }
        if (interest instanceof MultipleInterests && (multiple = (MultipleInterests)interest).flatten().isEmpty()) {
            return Observable.empty();
        }
        Observable appendInterest = Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Void>(){

            public void call(Subscriber<? super Void> subscriber) {
                EurekaInterestClientImpl.this.interestTracker.appendInterest(interest);
                subscriber.onCompleted();
            }
        });
        Observable toReturn = appendInterest.cast(ChangeNotification.class).mergeWith(this.registry.forInterest(interest)).doOnUnsubscribe(new Action0(){

            public void call() {
                EurekaInterestClientImpl.this.interestTracker.removeInterest(interest);
            }
        });
        return toReturn;
    }

    @Override
    protected RetryableConnection<InterestChannel> getRetryableConnection() {
        return this.retryableConnection;
    }
}

