package com.netflix.eureka2.client.interest;

import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.ChangeNotifications;
import com.netflix.eureka2.interests.Index;
import com.netflix.eureka2.interests.IndexRegistry;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.interests.MultipleInterests;
import com.netflix.eureka2.interests.SourcedChangeNotification;
import com.netflix.eureka2.interests.SourcedModifyNotification;
import com.netflix.eureka2.registry.SourcedEurekaRegistry;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:com/netflix/eureka2/client/interest/BatchAwareIndexRegistry.class */
public class BatchAwareIndexRegistry<T> implements IndexRegistry<T> {
    private static final ChangeNotification<?> FINISH_BATCHING_NOTIFICATION = new ChangeNotification<>(ChangeNotification.Kind.BufferSentinel, null);
    private final IndexRegistry<T> delegateRegistry;
    private final BatchingRegistry<T> remoteBatchingRegistry;

    public BatchAwareIndexRegistry(IndexRegistry<T> indexRegistry, BatchingRegistry<T> batchingRegistry) {
        this.delegateRegistry = indexRegistry;
        this.remoteBatchingRegistry = batchingRegistry;
    }

    @Override // com.netflix.eureka2.interests.IndexRegistry
    public Observable<ChangeNotification<T>> forInterest(Interest<T> interest, Observable<ChangeNotification<T>> observable, Index.InitStateHolder<T> initStateHolder) {
        return mergeWithBatchRegistryHints(interest, this.delegateRegistry.forInterest(interest, observable, initStateHolder));
    }

    @Override // com.netflix.eureka2.interests.IndexRegistry
    public Observable<ChangeNotification<T>> forCompositeInterest(MultipleInterests<T> multipleInterests, SourcedEurekaRegistry<T> sourcedEurekaRegistry) {
        return mergeWithBatchRegistryHints(multipleInterests, this.delegateRegistry.forCompositeInterest(multipleInterests, sourcedEurekaRegistry));
    }

    @Override // com.netflix.eureka2.interests.IndexRegistry
    public Observable<Void> shutdown() {
        this.remoteBatchingRegistry.shutdown();
        return this.delegateRegistry.shutdown();
    }

    @Override // com.netflix.eureka2.interests.IndexRegistry
    public Observable<Void> shutdown(Throwable th) {
        this.remoteBatchingRegistry.shutdown();
        return this.delegateRegistry.shutdown(th);
    }

    private Observable<ChangeNotification<T>> mergeWithBatchRegistryHints(final Interest<T> interest, final Observable<ChangeNotification<T>> observable) {
        return Observable.create(new Observable.OnSubscribe<ChangeNotification<T>>() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1
            public void call(Subscriber<? super ChangeNotification<T>> subscriber) {
                Observable<ChangeNotification<T>> publish = observable.publish();
                final BatchingRegistryImpl batchingRegistryImpl = new BatchingRegistryImpl();
                batchingRegistryImpl.connectTo(publish);
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                Observable doOnTerminate = BatchFunctions.combine(batchingRegistryImpl.forInterest(interest), BatchAwareIndexRegistry.this.remoteBatchingRegistry.forInterest(interest).doOnTerminate(new Action0() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.5
                    public void call() {
                        batchingRegistryImpl.shutdown();
                    }
                })).doOnNext(new Action1<Boolean>() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.4
                    public void call(Boolean bool) {
                        atomicBoolean.set(bool.booleanValue());
                    }
                }).filter(new Func1<Boolean, Boolean>() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.3
                    public Boolean call(Boolean bool) {
                        return Boolean.valueOf(!bool.booleanValue());
                    }
                }).map(new Func1<Boolean, ChangeNotification<T>>() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.2
                    public ChangeNotification<T> call(Boolean bool) {
                        return BatchAwareIndexRegistry.FINISH_BATCHING_NOTIFICATION;
                    }
                }).doOnTerminate(new Action0() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.1
                    public void call() {
                        batchingRegistryImpl.shutdown();
                    }
                });
                Observable flatMap = publish.filter(ChangeNotifications.dataOnlyFilter()).flatMap(new Func1<ChangeNotification<T>, Observable<ChangeNotification<T>>>() { // from class: com.netflix.eureka2.client.interest.BatchAwareIndexRegistry.1.6
                    public Observable<ChangeNotification<T>> call(ChangeNotification<T> changeNotification) {
                        ChangeNotification<T> changeNotification2 = changeNotification;
                        if (changeNotification instanceof SourcedChangeNotification) {
                            changeNotification2 = ((SourcedChangeNotification) changeNotification).toBaseNotification();
                        }
                        if (changeNotification instanceof SourcedModifyNotification) {
                            changeNotification2 = ((SourcedModifyNotification) changeNotification).toBaseNotification();
                        }
                        return atomicBoolean.get() ? Observable.just(changeNotification2) : Observable.just(changeNotification2, BatchAwareIndexRegistry.FINISH_BATCHING_NOTIFICATION);
                    }
                });
                PublishSubject create = PublishSubject.create();
                create.subscribe(subscriber);
                doOnTerminate.mergeWith(flatMap).subscribe(create);
                publish.connect();
            }
        });
    }
}
