/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.infinispan.CacheSet;
import org.infinispan.commons.util.IntSet;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.reactivestreams.Publisher;

@Scope(value=Scopes.NAMED_CACHE)
public class NonSegmentedLocalPublisherManagerImpl<K, V>
extends LocalPublisherManagerImpl<K, V> {
    static final int PARALLEL_BATCH_SIZE = 1024;

    @Override
    protected <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, LocalPublisherManagerImpl.SegmentListener listener, IntSet concurrentSegments) {
        Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
        if (keysToExclude != null) {
            flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
        }
        return this.combineStages(flowable.buffer(1024).parallel().runOn(this.nonBlockingScheduler).map(buffer -> (CompletionStage)transformer.apply((Object)Flowable.fromIterable((Iterable)buffer))).sequential(), true);
    }

    @Override
    protected <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, LocalPublisherManagerImpl.SegmentListener listener, IntSet concurrentSegments) {
        Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
        if (keysToExclude != null) {
            flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
        }
        return Flowable.fromCompletionStage(transformer.apply((Publisher<I>)flowable));
    }

    @Override
    protected <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> finalValue, LocalPublisherManagerImpl.SegmentListener listener) {
        return finalValue.thenApply(value -> {
            IntSet lostSegments = listener.segmentsLost;
            if (lostSegments.isEmpty()) {
                return LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(value);
            }
            return new SegmentPublisherResult<Object>(listener.segments, null);
        }).whenComplete((u, t) -> this.changeListener.remove(listener));
    }
}

