package com.netflix.eureka2.connection;

import com.netflix.eureka2.channel.ChannelFactory;
import com.netflix.eureka2.channel.ServiceChannel;
import com.netflix.eureka2.utils.rx.BreakerSwitchSubject;
import com.netflix.eureka2.utils.rx.NoOpSubscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;

/* loaded from: input_file:com/netflix/eureka2/connection/RetryableConnectionFactory.class */
public class RetryableConnectionFactory<CHANNEL extends ServiceChannel> {
    private static final Logger logger = LoggerFactory.getLogger(RetryableConnectionFactory.class);
    protected final ChannelFactory<CHANNEL> channelFactory;

    public RetryableConnectionFactory(ChannelFactory<CHANNEL> channelFactory) {
        this.channelFactory = channelFactory;
    }

    public RetryableConnection<CHANNEL> zeroOpConnection(final Func1<CHANNEL, Observable<Void>> func1) {
        return singleOpConnection(Observable.just(1), new Func2<CHANNEL, Integer, Observable<Void>>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.1
            public Observable<Void> call(CHANNEL channel, Integer num) {
                return (Observable) func1.call(channel);
            }
        });
    }

    public <OP> RetryableConnection<CHANNEL> singleOpConnection(Observable<OP> observable, final Func2<CHANNEL, OP, Observable<Void>> func2) {
        final AsyncSubject create = AsyncSubject.create();
        final BreakerSwitchSubject create2 = BreakerSwitchSubject.create(BehaviorSubject.create());
        final Observable refCount = observable.replay(1).refCount();
        final NoOpSubscriber noOpSubscriber = new NoOpSubscriber();
        Observable<CHANNEL> channelObservableWithCleanUp = channelObservableWithCleanUp(create2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        return new RetryableConnection<>(create2.asObservable(), Observable.combineLatest(channelObservableWithCleanUp, refCount, new Func2<CHANNEL, OP, Observable<Void>>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.4
            public Observable<Void> call(final CHANNEL channel, OP op) {
                RetryableConnectionFactory.logger.debug("executing on channel {} op {}", channel.toString(), op.toString());
                ((Observable) func2.call(channel, op)).subscribe(new Subscriber<Void>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.4.1
                    public void onCompleted() {
                        if (atomicBoolean2.compareAndSet(true, false)) {
                            create.onCompleted();
                        }
                    }

                    public void onError(Throwable th) {
                        channel.close(th);
                    }

                    public void onNext(Void r2) {
                    }
                });
                return channel.asLifecycleObservable();
            }

            /* JADX WARN: Multi-variable type inference failed */
            public /* bridge */ /* synthetic */ Object call(Object obj, Object obj2) {
                return call((AnonymousClass4<OP>) obj, (ServiceChannel) obj2);
            }
        }).flatMap(new Func1<Observable<Void>, Observable<Void>>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.3
            public Observable<Void> call(Observable<Void> observable2) {
                return observable2;
            }
        }).doOnSubscribe(new Action0() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.2
            public void call() {
                create2.onNext(RetryableConnectionFactory.this.channelFactory.newChannel());
                if (atomicBoolean.compareAndSet(false, true)) {
                    refCount.subscribe(noOpSubscriber);
                }
            }
        }).asObservable(), create.asObservable(), new Action0() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.5
            public void call() {
                create2.doOnNext(new Action1<CHANNEL>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.5.1
                    public void call(CHANNEL channel) {
                        channel.close();
                    }
                }).subscribe();
                create2.close();
                noOpSubscriber.unsubscribe();
            }
        });
    }

    protected Observable<CHANNEL> channelObservableWithCleanUp(Subject<CHANNEL, CHANNEL> subject) {
        Observable<CHANNEL> scan = subject.asObservable().scan(new Func2<CHANNEL, CHANNEL, CHANNEL>() { // from class: com.netflix.eureka2.connection.RetryableConnectionFactory.6
            public CHANNEL call(CHANNEL channel, CHANNEL channel2) {
                if (channel != null) {
                    RetryableConnectionFactory.logger.info("Closing old channel {}", channel);
                    channel.close();
                }
                return channel2;
            }
        });
        scan.subscribe(new NoOpSubscriber());
        return scan;
    }
}
