/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.client.registration;

import com.netflix.eureka2.channel.ChannelFactory;
import com.netflix.eureka2.channel.RegistrationChannel;
import com.netflix.eureka2.client.EurekaRegistrationClient;
import com.netflix.eureka2.client.registration.RegistrationObservable;
import com.netflix.eureka2.connection.RetryableConnection;
import com.netflix.eureka2.connection.RetryableConnectionFactory;
import com.netflix.eureka2.registry.instance.InstanceInfo;
import com.netflix.eureka2.utils.rx.NoOpSubscriber;
import com.netflix.eureka2.utils.rx.RetryStrategyFunc;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.AsyncSubject;

public class EurekaRegistrationClientImpl
implements EurekaRegistrationClient {
    private static final Logger logger = LoggerFactory.getLogger(EurekaRegistrationClientImpl.class);
    private static final int DEFAULT_RETRY_WAIT_MILLIS = 1000;
    private final RetryableConnectionFactory<RegistrationChannel> retryableConnectionFactory;
    private final int retryWaitMillis;
    private final AsyncSubject<Void> shutdownSubject = AsyncSubject.create();

    @Inject
    public EurekaRegistrationClientImpl(ChannelFactory<RegistrationChannel> channelFactory) {
        this(channelFactory, 1000);
    }

    EurekaRegistrationClientImpl(ChannelFactory<RegistrationChannel> channelFactory, int retryWaitMillis) {
        this.retryableConnectionFactory = new RetryableConnectionFactory<RegistrationChannel>(channelFactory);
        this.retryWaitMillis = retryWaitMillis;
    }

    @Override
    public RegistrationObservable register(Observable<InstanceInfo> instanceInfoStream) {
        Observable opStream = instanceInfoStream.distinctUntilChanged();
        Func2<RegistrationChannel, InstanceInfo, Observable<Void>> executeOnChannel = new Func2<RegistrationChannel, InstanceInfo, Observable<Void>>(){

            public Observable<Void> call(RegistrationChannel channel, InstanceInfo instanceInfo) {
                return channel.register(instanceInfo);
            }
        };
        final RetryableConnection<RegistrationChannel> retryableConnection = this.retryableConnectionFactory.singleOpConnection(opStream, executeOnChannel);
        Observable<Void> initObservable = retryableConnection.getInitObservable();
        Observable lifecycle = retryableConnection.getRetryableLifecycle().retryWhen((Func1)new RetryStrategyFunc(this.retryWaitMillis)).takeUntil(this.shutdownSubject).doOnUnsubscribe(new Action0(){

            public void call() {
                retryableConnection.getChannelObservable().flatMap((Func1)new Func1<RegistrationChannel, Observable<Void>>(){

                    public Observable<Void> call(RegistrationChannel channel) {
                        return channel.unregister().finallyDo(new Action0(){

                            public void call() {
                                retryableConnection.close();
                            }
                        });
                    }
                }).subscribe(new NoOpSubscriber());
            }
        }).share();
        return RegistrationObservable.from((Observable<Void>)lifecycle, initObservable);
    }

    @Override
    public void shutdown() {
        logger.info("Shutting down RegistrationClient");
        this.shutdownSubject.onCompleted();
    }
}

