/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.connection;

import com.netflix.eureka2.channel.ChannelFactory;
import com.netflix.eureka2.channel.ServiceChannel;
import com.netflix.eureka2.connection.RetryableConnection;
import com.netflix.eureka2.utils.rx.BreakerSwitchSubject;
import com.netflix.eureka2.utils.rx.NoOpSubscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;

public class RetryableConnectionFactory<CHANNEL extends ServiceChannel> {
    private static final Logger logger = LoggerFactory.getLogger(RetryableConnectionFactory.class);
    protected final ChannelFactory<CHANNEL> channelFactory;

    public RetryableConnectionFactory(ChannelFactory<CHANNEL> channelFactory) {
        this.channelFactory = channelFactory;
    }

    public RetryableConnection<CHANNEL> zeroOpConnection(final Func1<CHANNEL, Observable<Void>> executeOnChannel) {
        Observable opStream = Observable.just((Object)1);
        Func2 adaptedExecute = new Func2<CHANNEL, Integer, Observable<Void>>(){

            public Observable<Void> call(CHANNEL channel, Integer integer) {
                return (Observable)executeOnChannel.call(channel);
            }
        };
        return this.singleOpConnection(opStream, adaptedExecute);
    }

    public <OP> RetryableConnection<CHANNEL> singleOpConnection(Observable<OP> opStream, final Func2<CHANNEL, OP, Observable<Void>> executeOnChannel) {
        final AsyncSubject initSubject = AsyncSubject.create();
        final BreakerSwitchSubject channelSubject = BreakerSwitchSubject.create(BehaviorSubject.create());
        final Observable opObservable = opStream.replay(1).refCount();
        final NoOpSubscriber opSubscriber = new NoOpSubscriber();
        Observable<CHANNEL> channelObservable = this.channelObservableWithCleanUp(channelSubject);
        final AtomicBoolean opStreamConnected = new AtomicBoolean(false);
        final AtomicBoolean initialConnect = new AtomicBoolean(true);
        Observable lifecycle = Observable.combineLatest(channelObservable, (Observable)opObservable, (Func2)new Func2<CHANNEL, OP, Observable<Void>>(){

            public Observable<Void> call(final CHANNEL channel, OP op) {
                logger.debug("executing on channel {} op {}", (Object)channel.toString(), (Object)op.toString());
                ((Observable)executeOnChannel.call(channel, op)).subscribe((Subscriber)new Subscriber<Void>(){

                    public void onCompleted() {
                        if (initialConnect.compareAndSet(true, false)) {
                            initSubject.onCompleted();
                        }
                    }

                    public void onError(Throwable e) {
                        channel.close(e);
                    }

                    public void onNext(Void aVoid) {
                    }
                });
                return channel.asLifecycleObservable();
            }
        }).flatMap((Func1)new Func1<Observable<Void>, Observable<Void>>(){

            public Observable<Void> call(Observable<Void> observable) {
                return observable;
            }
        }).doOnSubscribe(new Action0(){

            public void call() {
                channelSubject.onNext(RetryableConnectionFactory.this.channelFactory.newChannel());
                if (opStreamConnected.compareAndSet(false, true)) {
                    opObservable.subscribe(opSubscriber);
                }
            }
        });
        return new RetryableConnection(channelSubject.asObservable(), (Observable<Void>)lifecycle.asObservable(), (Observable<Void>)initSubject.asObservable(), new Action0(){

            public void call() {
                channelSubject.doOnNext(new Action1<CHANNEL>(){

                    public void call(CHANNEL channel) {
                        channel.close();
                    }
                }).subscribe();
                channelSubject.close();
                opSubscriber.unsubscribe();
            }
        });
    }

    protected Observable<CHANNEL> channelObservableWithCleanUp(Subject<CHANNEL, CHANNEL> channelSubject) {
        Observable channelObservable = channelSubject.asObservable().scan(new Func2<CHANNEL, CHANNEL, CHANNEL>(){

            public CHANNEL call(CHANNEL prev, CHANNEL curr) {
                if (prev != null) {
                    logger.info("Closing old channel {}", prev);
                    prev.close();
                }
                return curr;
            }
        });
        channelObservable.subscribe(new NoOpSubscriber());
        return channelObservable;
    }
}

