package rx.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

/* loaded from: input_file:rx/operators/OperationCache.class */
public class OperationCache {

    /* loaded from: input_file:rx/operators/OperationCache$UnitTest.class */
    public static class UnitTest {
        @Test
        public void testCache() throws InterruptedException {
            final AtomicInteger atomicInteger = new AtomicInteger();
            Observable create = Observable.create(OperationCache.cache(Observable.create((Func1) new Func1<Observer<String>, Subscription>() { // from class: rx.operators.OperationCache.UnitTest.1
                @Override // rx.util.functions.Func1
                public Subscription call(final Observer<String> observer) {
                    BooleanSubscription booleanSubscription = new BooleanSubscription();
                    new Thread(new Runnable() { // from class: rx.operators.OperationCache.UnitTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            atomicInteger.incrementAndGet();
                            System.out.println("published observable being executed");
                            observer.onNext("one");
                            observer.onCompleted();
                        }
                    }).start();
                    return booleanSubscription;
                }
            })));
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            create.subscribe((Action1) new Action1<String>() { // from class: rx.operators.OperationCache.UnitTest.2
                @Override // rx.util.functions.Action1
                public void call(String str) {
                    Assert.assertEquals("one", str);
                    System.out.println("v: " + str);
                    countDownLatch.countDown();
                }
            });
            create.subscribe((Action1) new Action1<String>() { // from class: rx.operators.OperationCache.UnitTest.3
                @Override // rx.util.functions.Action1
                public void call(String str) {
                    Assert.assertEquals("one", str);
                    System.out.println("v: " + str);
                    countDownLatch.countDown();
                }
            });
            if (!countDownLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("subscriptions did not receive values");
            }
            Assert.assertEquals(1L, atomicInteger.get());
        }
    }

    public static <T> Func1<Observer<T>, Subscription> cache(final Observable<T> observable) {
        return new Func1<Observer<T>, Subscription>() { // from class: rx.operators.OperationCache.1
            final AtomicBoolean subscribed = new AtomicBoolean(false);
            private final ReplaySubject<T> cache = ReplaySubject.create();

            @Override // rx.util.functions.Func1
            public Subscription call(Observer<T> observer) {
                if (this.subscribed.compareAndSet(false, true)) {
                    Observable.this.subscribe((Observer) this.cache);
                }
                return this.cache.subscribe((Observer) observer);
            }
        };
    }
}
