package reactor.core.composable.spec;

import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.action.BufferAction;
import reactor.core.composable.Deferred;
import reactor.core.composable.Stream;
import reactor.event.Event;
import reactor.event.selector.Selector;
import reactor.tuple.Tuple2;

/* loaded from: input_file:reactor/core/composable/spec/DeferredStreamSpec.class */
public final class DeferredStreamSpec<T> extends ComposableSpec<DeferredStreamSpec<T>, Deferred<T, Stream<T>>> {
    private int batchSize = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/core/composable/spec/DeferredStreamSpec$BatchStreamDeferred.class */
    public static class BatchStreamDeferred<T> extends Deferred<T, Stream<T>> {
        private final BufferAction<T> consumer;

        public BatchStreamDeferred(Stream<T> stream, int i) {
            super(stream);
            this.consumer = batcher(i);
        }

        @Override // reactor.core.composable.Deferred
        public void acceptEvent(Event<T> event) {
            this.consumer.accept((Event) event);
        }
    }

    public DeferredStreamSpec<T> batchSize(int i) {
        this.batchSize = i;
        return this;
    }

    @Override // reactor.core.composable.spec.ComposableSpec
    protected Deferred<T, Stream<T>> createComposable(Environment environment, Observable observable, Tuple2<Selector, Object> tuple2) {
        Stream stream = new Stream(observable, this.batchSize, null, tuple2, environment);
        return this.batchSize > 1 ? new BatchStreamDeferred(stream, this.batchSize) : new Deferred<>(stream);
    }

    @Override // reactor.core.composable.spec.ComposableSpec
    protected /* bridge */ /* synthetic */ Object createComposable(Environment environment, Observable observable, Tuple2 tuple2) {
        return createComposable(environment, observable, (Tuple2<Selector, Object>) tuple2);
    }
}
