package com.netflix.eureka2.utils;

import com.netflix.eureka2.interests.ChangeNotification;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

/* loaded from: input_file:com/netflix/eureka2/utils/StreamedDataCollector.class */
public abstract class StreamedDataCollector<R> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/eureka2/utils/StreamedDataCollector$ChangeNotificationCollector.class */
    public static class ChangeNotificationCollector<E, R> extends StreamedDataCollector<R> {
        private static final Logger logger = LoggerFactory.getLogger(ChangeNotificationCollector.class);
        private final ConcurrentSkipListSet<R> servers = new ConcurrentSkipListSet<>();
        private final Subscription subscription;

        ChangeNotificationCollector(Observable<ChangeNotification<E>> observable, final Func1<E, R> func1) {
            this.subscription = observable.subscribe(new Subscriber<ChangeNotification<E>>() { // from class: com.netflix.eureka2.utils.StreamedDataCollector.ChangeNotificationCollector.1
                public void onCompleted() {
                }

                public void onError(Throwable th) {
                    ChangeNotificationCollector.logger.error("Change notification stream terminated with error", th);
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void onNext(ChangeNotification<E> changeNotification) {
                    Object call = func1.call(changeNotification.getData());
                    switch (changeNotification.getKind()) {
                        case Add:
                        case Modify:
                            ChangeNotificationCollector.this.servers.add(call);
                            return;
                        case Delete:
                            ChangeNotificationCollector.this.servers.remove(call);
                            return;
                        default:
                            return;
                    }
                }
            });
        }

        @Override // com.netflix.eureka2.utils.StreamedDataCollector
        public List<R> latestSnapshot() {
            if (this.subscription.isUnsubscribed()) {
                throw new IllegalStateException("change notification stream is closed");
            }
            return new ArrayList(this.servers);
        }

        @Override // com.netflix.eureka2.utils.StreamedDataCollector
        public void close() {
            this.subscription.unsubscribe();
        }
    }

    public abstract List<R> latestSnapshot();

    public abstract void close();

    public static <R> StreamedDataCollector<R> from(Collection<R> collection) {
        final List unmodifiableList = Collections.unmodifiableList(new ArrayList(collection));
        return new StreamedDataCollector<R>() { // from class: com.netflix.eureka2.utils.StreamedDataCollector.1
            @Override // com.netflix.eureka2.utils.StreamedDataCollector
            public List<R> latestSnapshot() {
                return unmodifiableList;
            }

            @Override // com.netflix.eureka2.utils.StreamedDataCollector
            public void close() {
            }
        };
    }

    public static <E, R> StreamedDataCollector<R> from(Observable<ChangeNotification<E>> observable, Func1<E, R> func1) {
        return new ChangeNotificationCollector(observable, func1);
    }
}
