package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.infinispan.CacheSet;
import org.infinispan.commons.util.IntSet;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.reactivestreams.Publisher;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/NonSegmentedLocalPublisherManagerImpl.class */
public class NonSegmentedLocalPublisherManagerImpl<K, V> extends LocalPublisherManagerImpl<K, V> {
    static final int PARALLEL_BATCH_SIZE = 1024;

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl
    protected <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
        if (set != null) {
            fromPublisher = fromPublisher.filter(obj -> {
                return !set.contains(function.apply(obj));
            });
        }
        return combineStages(fromPublisher.buffer(PARALLEL_BATCH_SIZE).parallel().runOn(this.nonBlockingScheduler).map(list -> {
            return (CompletionStage) function2.apply(Flowable.fromIterable(list));
        }).sequential(), true);
    }

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl
    protected <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
        if (set != null) {
            fromPublisher = fromPublisher.filter(obj -> {
                return !set.contains(function.apply(obj));
            });
        }
        return Flowable.fromCompletionStage(function2.apply(fromPublisher));
    }

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl
    protected <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        return completionStage.thenApply(obj -> {
            return segmentListener.segmentsLost.isEmpty() ? (PublisherResult) LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(obj) : new SegmentPublisherResult(segmentListener.segments, null);
        }).whenComplete((publisherResult, th) -> {
            this.changeListener.remove(segmentListener);
        });
    }
}
