/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.interests;

import com.netflix.eureka2.interests.ChangeNotification;
import com.netflix.eureka2.interests.Interest;
import com.netflix.eureka2.utils.rx.PauseableSubject;
import java.util.Iterator;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class Index<T>
extends Subject<ChangeNotification<T>, ChangeNotification<T>> {
    private final Interest<T> interest;
    private final PauseableSubject<ChangeNotification<T>> notificationsSubject;

    protected Index(Interest<T> interest, final InitStateHolder<T> initStateHolder, final Subject<ChangeNotification<T>, ChangeNotification<T>> realTimeSource) {
        super(new Observable.OnSubscribe<ChangeNotification<T>>(){

            public void call(Subscriber<? super ChangeNotification<T>> subscriber) {
                final PauseableSubject realTimeSubject = PauseableSubject.create();
                realTimeSubject.pause();
                realTimeSource.subscribe(realTimeSubject);
                realTimeSubject.mergeWith(Observable.from((Iterable)initStateHolder).doOnCompleted(new Action0(){

                    public void call() {
                        realTimeSubject.resume();
                    }
                })).subscribe(subscriber);
            }
        });
        this.interest = interest;
        this.notificationsSubject = initStateHolder.getNotificationSubject();
        this.notificationsSubject.subscribe(initStateHolder);
        this.notificationsSubject.subscribe((Observer)realTimeSource);
    }

    public Interest getInterest() {
        return this.interest;
    }

    public void onCompleted() {
        this.notificationsSubject.onCompleted();
    }

    public void onError(Throwable e) {
        this.notificationsSubject.onError(e);
    }

    public void onNext(ChangeNotification<T> notification) {
        this.notificationsSubject.onNext(notification);
    }

    public static <T> Index<T> forInterest(final Interest<T> interest, Observable<ChangeNotification<T>> dataSource, InitStateHolder<T> initStateHolder) {
        PublishSubject realTimeSource = PublishSubject.create();
        Index<T> toReturn = new Index<T>(interest, initStateHolder, realTimeSource);
        dataSource.filter(new Func1<ChangeNotification<T>, Boolean>(){

            public Boolean call(ChangeNotification<T> notification) {
                return interest.matches(notification.getData());
            }
        }).subscribe(toReturn);
        return toReturn;
    }

    public boolean hasObservers() {
        return this.notificationsSubject.hasObservers();
    }

    public static abstract class InitStateHolder<T>
    extends Subscriber<ChangeNotification<T>>
    implements Iterable<ChangeNotification<T>> {
        protected final Iterator<ChangeNotification<T>> EMPTY_ITERATOR = new Iterator<ChangeNotification<T>>(){

            @Override
            public boolean hasNext() {
                return false;
            }

            @Override
            public ChangeNotification<T> next() {
                return null;
            }

            @Override
            public void remove() {
            }
        };
        private volatile boolean done;
        private final PauseableSubject<ChangeNotification<T>> notificationSubject;

        protected InitStateHolder(PauseableSubject<ChangeNotification<T>> notificationSubject) {
            this.notificationSubject = notificationSubject;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Iterator<ChangeNotification<T>> iterator() {
            if (this.isDone()) {
                return this.EMPTY_ITERATOR;
            }
            try {
                this.notificationSubject.pause();
                Iterator<ChangeNotification<T>> iterator = this._newIterator();
                return iterator;
            }
            finally {
                this.notificationSubject.resume();
            }
        }

        public final void onCompleted() {
            this.done = true;
            this.clearAllNotifications();
        }

        public final void onError(Throwable e) {
            this.done = true;
            this.clearAllNotifications();
        }

        public final void onNext(ChangeNotification<T> notification) {
            this.addNotification(notification);
        }

        protected PauseableSubject<ChangeNotification<T>> getNotificationSubject() {
            return this.notificationSubject;
        }

        protected boolean isDone() {
            return this.done;
        }

        protected abstract void addNotification(ChangeNotification<T> var1);

        protected abstract void clearAllNotifications();

        protected abstract Iterator<ChangeNotification<T>> _newIterator();
    }
}

