package rx.operators;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.concurrency.Schedulers;
import rx.subjects.PublishSubject;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

/* loaded from: input_file:rx/operators/OperationParallelMergeTest.class */
public class OperationParallelMergeTest {
    @Test
    public void testParallelMerge() {
        Observable from = Observable.from(PublishSubject.create(), PublishSubject.create(), PublishSubject.create(), PublishSubject.create());
        Observable parallelMerge = OperationParallelMerge.parallelMerge(from, 2);
        Observable parallelMerge2 = OperationParallelMerge.parallelMerge(from, 3);
        List list = (List) from.toList().toBlockingObservable().last();
        List list2 = (List) parallelMerge2.toList().toBlockingObservable().last();
        System.out.println("two list: " + ((List) parallelMerge.toList().toBlockingObservable().last()));
        System.out.println("three list: " + list2);
        System.out.println("four list: " + list);
        Assert.assertEquals(4L, list.size());
        Assert.assertEquals(3L, list2.size());
        Assert.assertEquals(2L, r0.size());
    }

    @Test
    public void testNumberOfThreads() {
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Observable.merge(getStreams()).toBlockingObservable().forEach(new Action1<String>() { // from class: rx.operators.OperationParallelMergeTest.1
            @Override // rx.util.functions.Action1
            public void call(String str) {
                System.out.println("o: " + str + " Thread: " + Thread.currentThread());
                concurrentHashMap.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals(Runtime.getRuntime().availableProcessors(), concurrentHashMap.keySet().size());
        concurrentHashMap.clear();
        OperationParallelMerge.parallelMerge(getStreams(), 3).flatMap(new Func1<Observable<String>, Observable<String>>() { // from class: rx.operators.OperationParallelMergeTest.3
            @Override // rx.util.functions.Func1
            public Observable<String> call(Observable<String> observable) {
                return observable.observeOn(Schedulers.newThread());
            }
        }).toBlockingObservable().forEach(new Action1<String>() { // from class: rx.operators.OperationParallelMergeTest.2
            @Override // rx.util.functions.Action1
            public void call(String str) {
                System.out.println("o: " + str + " Thread: " + Thread.currentThread());
                concurrentHashMap.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals(3L, concurrentHashMap.keySet().size());
    }

    @Test
    public void testNumberOfThreadsOnScheduledMerge() {
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Observable.merge(OperationParallelMerge.parallelMerge(getStreams(), 3, Schedulers.newThread())).toBlockingObservable().forEach(new Action1<String>() { // from class: rx.operators.OperationParallelMergeTest.4
            @Override // rx.util.functions.Action1
            public void call(String str) {
                System.out.println("o: " + str + " Thread: " + Thread.currentThread());
                concurrentHashMap.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals(3L, concurrentHashMap.keySet().size());
    }

    private static Observable<Observable<String>> getStreams() {
        return Observable.range(0, 10).map(new Func1<Integer, Observable<String>>() { // from class: rx.operators.OperationParallelMergeTest.5
            @Override // rx.util.functions.Func1
            public Observable<String> call(final Integer num) {
                return Observable.interval(10L, TimeUnit.MILLISECONDS).map(new Func1<Long, String>() { // from class: rx.operators.OperationParallelMergeTest.5.1
                    @Override // rx.util.functions.Func1
                    public String call(Long l) {
                        return "Stream " + num + "  Value: " + l;
                    }
                }).take(5);
            }
        });
    }
}
