package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.operators.ChunkedOperation;
import rx.subscriptions.Subscriptions;
import rx.util.Closing;
import rx.util.Closings;
import rx.util.Opening;
import rx.util.Openings;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/* loaded from: input_file:rx/operators/OperationWindow.class */
public final class OperationWindow extends ChunkedOperation {

    /* loaded from: input_file:rx/operators/OperationWindow$UnitTest.class */
    public static class UnitTest {
        private TestScheduler scheduler;

        @Before
        public void before() {
            this.scheduler = new TestScheduler();
        }

        private static <T> List<List<T>> toLists(Observable<Observable<T>> observable) {
            final ArrayList arrayList = new ArrayList();
            final ArrayList arrayList2 = new ArrayList();
            observable.subscribe(new Action1<Observable<T>>() { // from class: rx.operators.OperationWindow.UnitTest.1
                @Override // rx.util.functions.Action1
                public void call(Observable<T> observable2) {
                    observable2.subscribe(new Action1<T>() { // from class: rx.operators.OperationWindow.UnitTest.1.1
                        @Override // rx.util.functions.Action1
                        public void call(T t) {
                            arrayList.add(t);
                        }
                    });
                    arrayList2.add(new ArrayList(arrayList));
                    arrayList.clear();
                }
            });
            return arrayList2;
        }

        @Test
        public void testNonOverlappingWindows() {
            List lists = toLists(Observable.create(OperationWindow.window(Observable.from("one", "two", "three", "four", "five"), 3)));
            Assert.assertEquals(2L, lists.size());
            Assert.assertEquals(list("one", "two", "three"), lists.get(0));
            Assert.assertEquals(list("four", "five"), lists.get(1));
        }

        @Test
        public void testSkipAndCountGaplessEindows() {
            List lists = toLists(Observable.create(OperationWindow.window(Observable.from("one", "two", "three", "four", "five"), 3, 3)));
            Assert.assertEquals(2L, lists.size());
            Assert.assertEquals(list("one", "two", "three"), lists.get(0));
            Assert.assertEquals(list("four", "five"), lists.get(1));
        }

        @Test
        public void testOverlappingWindows() {
            List lists = toLists(Observable.create(OperationWindow.window(Observable.from("zero", "one", "two", "three", "four", "five"), 3, 1)));
            Assert.assertEquals(6L, lists.size());
            Assert.assertEquals(list("zero", "one", "two"), lists.get(0));
            Assert.assertEquals(list("one", "two", "three"), lists.get(1));
            Assert.assertEquals(list("two", "three", "four"), lists.get(2));
            Assert.assertEquals(list("three", "four", "five"), lists.get(3));
            Assert.assertEquals(list("four", "five"), lists.get(4));
            Assert.assertEquals(list("five"), lists.get(5));
        }

        @Test
        public void testSkipAndCountWindowsWithGaps() {
            List lists = toLists(Observable.create(OperationWindow.window(Observable.from("one", "two", "three", "four", "five"), 2, 3)));
            Assert.assertEquals(2L, lists.size());
            Assert.assertEquals(list("one", "two"), lists.get(0));
            Assert.assertEquals(list("four", "five"), lists.get(1));
        }

        @Test
        public void testTimedAndCount() {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Observable.create(OperationWindow.window(Observable.create(new Observable.OnSubscribeFunc<String>() { // from class: rx.operators.OperationWindow.UnitTest.2
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super String> observer) {
                    UnitTest.this.push(observer, "one", 10);
                    UnitTest.this.push(observer, "two", 90);
                    UnitTest.this.push(observer, "three", 110);
                    UnitTest.this.push(observer, "four", 190);
                    UnitTest.this.push(observer, "five", 210);
                    UnitTest.this.complete(observer, 250);
                    return Subscriptions.empty();
                }
            }), 100L, TimeUnit.MILLISECONDS, 2, this.scheduler)).subscribe(observeWindow(arrayList, arrayList2));
            this.scheduler.advanceTimeTo(100L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(1L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(0), list("one", "two"));
            this.scheduler.advanceTimeTo(200L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(2L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(1), list("three", "four"));
            this.scheduler.advanceTimeTo(300L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(3L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(2), list("five"));
        }

        @Test
        public void testTimed() {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Observable.create(OperationWindow.window(Observable.create(new Observable.OnSubscribeFunc<String>() { // from class: rx.operators.OperationWindow.UnitTest.3
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super String> observer) {
                    UnitTest.this.push(observer, "one", 98);
                    UnitTest.this.push(observer, "two", 99);
                    UnitTest.this.push(observer, "three", 100);
                    UnitTest.this.push(observer, "four", 101);
                    UnitTest.this.push(observer, "five", 102);
                    UnitTest.this.complete(observer, 150);
                    return Subscriptions.empty();
                }
            }), 100L, TimeUnit.MILLISECONDS, this.scheduler)).subscribe(observeWindow(arrayList, arrayList2));
            this.scheduler.advanceTimeTo(101L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(1L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(0), list("one", "two", "three"));
            this.scheduler.advanceTimeTo(201L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(2L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(1), list("four", "five"));
        }

        @Test
        public void testObservableBasedOpenerAndCloser() {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Observable.create(OperationWindow.window(Observable.create(new Observable.OnSubscribeFunc<String>() { // from class: rx.operators.OperationWindow.UnitTest.4
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super String> observer) {
                    UnitTest.this.push(observer, "one", 10);
                    UnitTest.this.push(observer, "two", 60);
                    UnitTest.this.push(observer, "three", 110);
                    UnitTest.this.push(observer, "four", 160);
                    UnitTest.this.push(observer, "five", 210);
                    UnitTest.this.complete(observer, 500);
                    return Subscriptions.empty();
                }
            }), (Observable<? extends Opening>) Observable.create(new Observable.OnSubscribeFunc<Opening>() { // from class: rx.operators.OperationWindow.UnitTest.5
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super Opening> observer) {
                    UnitTest.this.push(observer, Openings.create(), 50);
                    UnitTest.this.push(observer, Openings.create(), 200);
                    UnitTest.this.complete(observer, 250);
                    return Subscriptions.empty();
                }
            }), new Func1<Opening, Observable<Closing>>() { // from class: rx.operators.OperationWindow.UnitTest.6
                @Override // rx.util.functions.Func1
                public Observable<Closing> call(Opening opening) {
                    return Observable.create(new Observable.OnSubscribeFunc<Closing>() { // from class: rx.operators.OperationWindow.UnitTest.6.1
                        @Override // rx.Observable.OnSubscribeFunc
                        public Subscription onSubscribe(Observer<? super Closing> observer) {
                            UnitTest.this.push(observer, Closings.create(), 100);
                            UnitTest.this.complete(observer, 101);
                            return Subscriptions.empty();
                        }
                    });
                }
            })).subscribe(observeWindow(arrayList, arrayList2));
            this.scheduler.advanceTimeTo(500L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(2L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(0), list("two", "three"));
            Assert.assertEquals(arrayList2.get(1), list("five"));
        }

        @Test
        public void testObservableBasedCloser() {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Observable.create(OperationWindow.window(Observable.create(new Observable.OnSubscribeFunc<String>() { // from class: rx.operators.OperationWindow.UnitTest.7
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super String> observer) {
                    UnitTest.this.push(observer, "one", 10);
                    UnitTest.this.push(observer, "two", 60);
                    UnitTest.this.push(observer, "three", 110);
                    UnitTest.this.push(observer, "four", 160);
                    UnitTest.this.push(observer, "five", 210);
                    UnitTest.this.complete(observer, 250);
                    return Subscriptions.empty();
                }
            }), new Func0<Observable<Closing>>() { // from class: rx.operators.OperationWindow.UnitTest.8
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // rx.util.functions.Func0
                public Observable<Closing> call() {
                    return Observable.create(new Observable.OnSubscribeFunc<Closing>() { // from class: rx.operators.OperationWindow.UnitTest.8.1
                        @Override // rx.Observable.OnSubscribeFunc
                        public Subscription onSubscribe(Observer<? super Closing> observer) {
                            UnitTest.this.push(observer, Closings.create(), 100);
                            UnitTest.this.complete(observer, 101);
                            return Subscriptions.empty();
                        }
                    });
                }
            })).subscribe(observeWindow(arrayList, arrayList2));
            this.scheduler.advanceTimeTo(500L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(3L, arrayList2.size());
            Assert.assertEquals(arrayList2.get(0), list("one", "two"));
            Assert.assertEquals(arrayList2.get(1), list("three", "four"));
            Assert.assertEquals(arrayList2.get(2), list("five"));
        }

        private List<String> list(String... strArr) {
            ArrayList arrayList = new ArrayList();
            for (String str : strArr) {
                arrayList.add(str);
            }
            return arrayList;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public <T> void push(final Observer<T> observer, final T t, int i) {
            this.scheduler.schedule(new Action0() { // from class: rx.operators.OperationWindow.UnitTest.9
                @Override // rx.util.functions.Action0
                public void call() {
                    observer.onNext(t);
                }
            }, i, TimeUnit.MILLISECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void complete(final Observer<?> observer, int i) {
            this.scheduler.schedule(new Action0() { // from class: rx.operators.OperationWindow.UnitTest.10
                @Override // rx.util.functions.Action0
                public void call() {
                    observer.onCompleted();
                }
            }, i, TimeUnit.MILLISECONDS);
        }

        private Action1<Observable<String>> observeWindow(final List<String> list, final List<List<String>> list2) {
            return new Action1<Observable<String>>() { // from class: rx.operators.OperationWindow.UnitTest.11
                @Override // rx.util.functions.Action1
                public void call(Observable<String> observable) {
                    observable.subscribe(new Observer<String>() { // from class: rx.operators.OperationWindow.UnitTest.11.1
                        @Override // rx.Observer
                        public void onCompleted() {
                            list2.add(new ArrayList(list));
                            list.clear();
                        }

                        @Override // rx.Observer
                        public void onError(Throwable th) {
                            Assert.fail(th.getMessage());
                        }

                        @Override // rx.Observer
                        public void onNext(String str) {
                            list.add(str);
                        }
                    });
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:rx/operators/OperationWindow$Window.class */
    public static class Window<T> extends ChunkedOperation.Chunk<T, Observable<T>> {
        protected Window() {
        }

        @Override // rx.operators.ChunkedOperation.Chunk
        public Observable<T> getContents() {
            return Observable.from((Iterable) this.contents);
        }
    }

    public static <T> Func0<Window<T>> windowMaker() {
        return new Func0<Window<T>>() { // from class: rx.operators.OperationWindow.1
            @Override // rx.util.functions.Func0
            public Window<T> call() {
                return new Window<>();
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final Func0<? extends Observable<? extends Closing>> func0) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.2
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks nonOverlappingChunks = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.subscribe(new ChunkedOperation.ChunkObserver(nonOverlappingChunks, observer, new ChunkedOperation.ObservableBasedSingleChunkCreator(nonOverlappingChunks, Func0.this)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final Observable<? extends Opening> observable2, final Func1<Opening, ? extends Observable<? extends Closing>> func1) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.3
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.OverlappingChunks overlappingChunks = new ChunkedOperation.OverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.subscribe(new ChunkedOperation.ChunkObserver(overlappingChunks, observer, new ChunkedOperation.ObservableBasedMultiChunkCreator(overlappingChunks, Observable.this, func1)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, int i) {
        return window(observable, i, i);
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final int i, final int i2) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.4
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.SizeBasedChunks sizeBasedChunks = new ChunkedOperation.SizeBasedChunks(observer, OperationWindow.windowMaker(), i);
                return observable.subscribe(new ChunkedOperation.ChunkObserver(sizeBasedChunks, observer, new ChunkedOperation.SkippingChunkCreator(sizeBasedChunks, i2)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, TimeUnit timeUnit) {
        return window(observable, j, timeUnit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final TimeUnit timeUnit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.5
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks nonOverlappingChunks = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.subscribe(new ChunkedOperation.ChunkObserver(nonOverlappingChunks, observer, new ChunkedOperation.TimeBasedChunkCreator(nonOverlappingChunks, j, timeUnit, scheduler)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, TimeUnit timeUnit, int i) {
        return window(observable, j, timeUnit, i, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final TimeUnit timeUnit, final int i, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.6
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeAndSizeBasedChunks timeAndSizeBasedChunks = new ChunkedOperation.TimeAndSizeBasedChunks(observer, OperationWindow.windowMaker(), i, j, timeUnit, scheduler);
                return observable.subscribe(new ChunkedOperation.ChunkObserver(timeAndSizeBasedChunks, observer, new ChunkedOperation.SingleChunkCreator(timeAndSizeBasedChunks)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, long j2, TimeUnit timeUnit) {
        return window(observable, j, j2, timeUnit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final long j2, final TimeUnit timeUnit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.7
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeBasedChunks timeBasedChunks = new ChunkedOperation.TimeBasedChunks(observer, OperationWindow.windowMaker(), j, timeUnit, scheduler);
                return observable.subscribe(new ChunkedOperation.ChunkObserver(timeBasedChunks, observer, new ChunkedOperation.TimeBasedChunkCreator(timeBasedChunks, j2, timeUnit, scheduler)));
            }
        };
    }
}
