/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import reactor.bus.registry.Registration;
import reactor.core.Dispatcher;
import reactor.core.dispatch.InsufficientCapacityException;
import reactor.fn.Consumer;
import reactor.fn.timer.Timer;
import reactor.rx.action.Action;
import reactor.rx.subscription.BatchSubscription;
import reactor.rx.subscription.PushSubscription;

public abstract class BatchAction<T, V>
extends Action<T, V> {
    protected final boolean next;
    protected final boolean flush;
    protected final boolean first;
    protected final int batchSize;
    protected final Consumer<Long> timeoutTask;
    protected final Registration<? extends Consumer<Long>> timespanRegistration;
    protected final Dispatcher dispatcher;
    protected final Consumer<T> flushConsumer = new FlushConsumer();
    protected int index = 0;

    public BatchAction(Dispatcher dispatcher, int batchSize, boolean next, boolean first, boolean flush) {
        this(dispatcher, batchSize, next, first, flush, -1L, null, null);
    }

    public BatchAction(final Dispatcher dispatcher, int batchSize, boolean next, boolean first, boolean flush, long timespan, TimeUnit unit, Timer timer) {
        super(batchSize);
        this.dispatcher = dispatcher;
        if (timespan > 0L) {
            this.timeoutTask = new Consumer<Long>(){

                @Override
                public void accept(Long aLong) {
                    if (BatchAction.this.index > 0) {
                        try {
                            dispatcher.tryDispatch(null, BatchAction.this.flushConsumer, null);
                        }
                        catch (InsufficientCapacityException insufficientCapacityException) {
                            // empty catch block
                        }
                    }
                }
            };
            TimeUnit targetUnit = unit != null ? unit : TimeUnit.SECONDS;
            this.timespanRegistration = timer.schedule(this.timeoutTask, timespan, targetUnit, TimeUnit.MILLISECONDS.convert(timespan, targetUnit));
            this.timespanRegistration.pause();
        } else {
            this.timeoutTask = null;
            this.timespanRegistration = null;
        }
        this.first = first;
        this.flush = flush;
        this.next = next;
        this.batchSize = batchSize;
    }

    @Override
    protected PushSubscription<T> createTrackingSubscription(Subscription subscription) {
        return new BatchSubscription(subscription, this, this.batchSize);
    }

    @Override
    protected void doSubscribe(Subscription subscription) {
        super.doSubscribe(subscription);
        if (this.timespanRegistration != null) {
            this.timespanRegistration.resume();
        }
    }

    protected void nextCallback(T event) {
    }

    protected void flushCallback(T event) {
    }

    protected void firstCallback(T event) {
    }

    @Override
    protected void doNext(T value) {
        ++this.index;
        if (this.first && this.index == 1) {
            this.firstCallback(value);
        }
        if (this.next) {
            this.nextCallback(value);
        }
        if (this.index % this.batchSize == 0) {
            this.index = 0;
            if (this.flush) {
                this.flushConsumer.accept(value);
            }
        }
    }

    @Override
    protected void doComplete() {
        this.flushConsumer.accept(null);
        super.doComplete();
    }

    @Override
    public void cancel() {
        if (this.timespanRegistration != null) {
            this.timespanRegistration.cancel();
        }
        super.cancel();
    }

    @Override
    public String toString() {
        return super.toString() + "{" + (this.timespanRegistration != null ? "timed!" : "") + " batchSize=" + this.index + "/" + this.batchSize + " [" + (int)((float)this.index / (float)this.batchSize * 100.0f) + "%]";
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    private final class FlushConsumer
    implements Consumer<T> {
        private FlushConsumer() {
        }

        @Override
        public void accept(T n) {
            BatchAction.this.flushCallback(n);
            BatchAction.this.index = 0;
        }
    }
}

