/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import reactor.Environment;
import reactor.bus.registry.Registration;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.fn.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.ReactiveSubscription;

public class WindowShiftAction<T>
extends Action<T, Stream<T>> {
    private final Consumer<Long> timeshiftTask;
    private final List<ReactiveSubscription<T>> currentWindows = new LinkedList<ReactiveSubscription<T>>();
    private final Registration<? extends Consumer<Long>> timeshiftRegistration;
    private final int skip;
    private final int batchSize;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private int index;

    public WindowShiftAction(Environment environment, Dispatcher dispatcher, int size, int skip) {
        this(environment, dispatcher, size, skip, -1L, -1L, null, null);
    }

    public WindowShiftAction(Environment environment, final Dispatcher dispatcher, int size, int skip, final long timespan, long timeshift, TimeUnit unit, final Timer timer) {
        this.dispatcher = dispatcher;
        this.skip = skip;
        this.environment = environment;
        this.batchSize = size;
        if (timespan > 0L && timeshift > 0L) {
            final TimeUnit targetUnit = unit != null ? unit : TimeUnit.SECONDS;
            final Consumer flushTimerTask = new Consumer<ReactiveSubscription<T>>(){

                @Override
                public void accept(ReactiveSubscription<T> bucket) {
                    Iterator it = WindowShiftAction.this.currentWindows.iterator();
                    while (it.hasNext()) {
                        ReactiveSubscription itBucket = (ReactiveSubscription)it.next();
                        if (bucket != itBucket) continue;
                        it.remove();
                        bucket.onComplete();
                        break;
                    }
                }
            };
            this.timeshiftTask = new Consumer<Long>(){

                @Override
                public void accept(Long aLong) {
                    dispatcher.dispatch(null, new Consumer<Void>(){

                        @Override
                        public void accept(Void aVoid) {
                            final ReactiveSubscription bucket = WindowShiftAction.this.createWindowStream();
                            timer.submit(new Consumer<Long>(){

                                @Override
                                public void accept(Long aLong) {
                                    dispatcher.dispatch(bucket, flushTimerTask, null);
                                }
                            }, timespan, targetUnit);
                        }
                    }, null);
                }
            };
            this.timeshiftRegistration = timer.schedule(this.timeshiftTask, timeshift, targetUnit);
            this.timeshiftRegistration.pause();
        } else {
            this.timeshiftRegistration = null;
            this.timeshiftTask = null;
        }
    }

    @Override
    public void requestMore(long n) {
        if (this.timeshiftRegistration != null && this.timeshiftRegistration.isPaused()) {
            this.timeshiftRegistration.resume();
        }
        super.requestMore(n);
    }

    @Override
    protected void doNext(T value) {
        if (this.timeshiftRegistration == null && this.index++ % this.skip == 0) {
            this.createWindowStream();
        }
        this.flushCallback(value);
    }

    @Override
    protected void doComplete() {
        for (ReactiveSubscription<T> bucket : this.currentWindows) {
            bucket.onComplete();
        }
        this.currentWindows.clear();
        this.broadcastComplete();
    }

    private void flushCallback(T event) {
        Iterator<ReactiveSubscription<T>> it = this.currentWindows.iterator();
        while (it.hasNext()) {
            ReactiveSubscription<T> bucket = it.next();
            bucket.onNext(event);
            if (bucket.currentNextSignals() != (long)this.batchSize) continue;
            it.remove();
            bucket.onComplete();
        }
    }

    @Override
    public void cancel() {
        if (this.timeshiftRegistration != null) {
            this.timeshiftRegistration.cancel();
        }
        super.cancel();
    }

    protected ReactiveSubscription<T> createWindowStream() {
        Broadcaster action = Broadcaster.create(this.environment, this.dispatcher);
        ReactiveSubscription _currentWindow = new ReactiveSubscription(null, action);
        this.currentWindows.add(_currentWindow);
        action.onSubscribe(_currentWindow);
        this.broadcastNext(action);
        return _currentWindow;
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    protected void doError(Throwable ev) {
        super.doError(ev);
        for (ReactiveSubscription<T> bucket : this.currentWindows) {
            bucket.onError(ev);
        }
        this.currentWindows.clear();
    }
}

