package reactor.core.action;

import reactor.core.Observable;
import reactor.event.Event;
import reactor.function.Function;
import reactor.function.Supplier;
import reactor.tuple.Tuple2;

/* loaded from: input_file:reactor/core/action/ReduceAction.class */
public class ReduceAction<T, A> extends BatchAction<T> implements Flushable<T> {
    private final Supplier<A> accumulators;
    private final Function<Tuple2<T, A>, A> fn;
    private volatile A acc;

    public ReduceAction(int i, Supplier<A> supplier, Function<Tuple2<T, A>, A> function, Observable observable, Object obj, Object obj2) {
        super(i, observable, obj, obj2);
        this.accumulators = supplier;
        this.fn = function;
    }

    @Override // reactor.core.action.BatchAction
    protected void doNext(Event<T> event) {
        if (null == this.acc) {
            this.acc = null != this.accumulators ? this.accumulators.get() : null;
        }
        this.acc = this.fn.apply(Tuple2.of(event.getData(), this.acc));
    }

    @Override // reactor.core.action.BatchAction
    protected void doFlush(Event<T> event) {
        if (this.acc != null) {
            notifyValue(event.copy(this.acc));
            this.acc = null;
        }
    }

    @Override // reactor.core.action.Flushable
    public Flushable<T> flush() {
        this.lock.lock();
        try {
            if (this.acc != null) {
                notifyValue(Event.wrap(this.acc));
            }
            return this;
        } finally {
            this.lock.unlock();
        }
    }
}
