/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.BehaviorBroadcaster;
import reactor.rx.broadcast.Broadcaster;

public class WindowShiftWhenAction<T>
extends Action<T, Stream<T>> {
    private final List<Broadcaster<T>> currentWindows = new LinkedList<Broadcaster<T>>();
    private final Supplier<? extends Publisher<?>> bucketClosing;
    private final Publisher<?> bucketOpening;
    private final Environment environment;
    private final Dispatcher dispatcher;

    public WindowShiftWhenAction(Environment environment, Dispatcher dispatcher, Publisher<?> bucketOpenings, Supplier<? extends Publisher<?>> boundarySupplier) {
        this.dispatcher = dispatcher;
        this.bucketClosing = boundarySupplier;
        this.bucketOpening = bucketOpenings;
        this.environment = environment;
    }

    @Override
    protected void doSubscribe(Subscription subscription) {
        super.doSubscribe(subscription);
        this.bucketOpening.subscribe((Subscriber)new Subscriber<Object>(){
            Subscription s;

            public void onSubscribe(Subscription s) {
                this.s = s;
                s.request(1L);
            }

            public void onNext(Object o) {
                WindowShiftWhenAction.this.dispatcher.dispatch(null, new Consumer<Void>(){

                    @Override
                    public void accept(Void aVoid) {
                        Broadcaster<Object> newBucket = WindowShiftWhenAction.this.createWindowStream(null);
                        ((Publisher)WindowShiftWhenAction.this.bucketClosing.get()).subscribe((Subscriber)new BucketConsumer(newBucket));
                    }
                }, null);
                if (this.s != null) {
                    this.s.request(1L);
                }
            }

            public void onError(Throwable t) {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowShiftWhenAction.this.onError(t);
            }

            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowShiftWhenAction.this.onComplete();
            }
        });
    }

    @Override
    protected void doError(Throwable ev) {
        for (Broadcaster<T> bucket : this.currentWindows) {
            bucket.onError(ev);
        }
        this.currentWindows.clear();
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        for (Broadcaster<T> bucket : this.currentWindows) {
            bucket.onComplete();
        }
        this.currentWindows.clear();
        this.broadcastComplete();
    }

    @Override
    protected void doNext(T value) {
        if (!this.currentWindows.isEmpty()) {
            for (Broadcaster<T> bucket : this.currentWindows) {
                bucket.onNext(value);
            }
        }
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected Broadcaster<T> createWindowStream(T first) {
        Broadcaster<T> action = BehaviorBroadcaster.first(first, this.environment, this.dispatcher);
        this.currentWindows.add(action);
        this.broadcastNext(action);
        return action;
    }

    private class BucketConsumer
    implements Subscriber<Object> {
        final Broadcaster<T> bucket;
        Subscription s;

        public BucketConsumer(Broadcaster<T> bucket) {
            this.bucket = bucket;
        }

        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(Long.MAX_VALUE);
        }

        public void onNext(Object o) {
            this.onComplete();
        }

        public void onError(Throwable t) {
            if (this.s != null) {
                this.s.cancel();
            }
            WindowShiftWhenAction.this.onError(t);
        }

        public void onComplete() {
            if (this.s != null) {
                this.s.cancel();
            }
            WindowShiftWhenAction.this.dispatcher.dispatch(null, new Consumer<Void>(){

                @Override
                public void accept(Void aVoid) {
                    Iterator iterator = WindowShiftWhenAction.this.currentWindows.iterator();
                    while (iterator.hasNext()) {
                        Broadcaster itBucket = (Broadcaster)iterator.next();
                        if (itBucket != BucketConsumer.this.bucket) continue;
                        iterator.remove();
                        BucketConsumer.this.bucket.onComplete();
                        break;
                    }
                }
            }, null);
        }
    }
}

