package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableConverter;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.IntConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.cache.impl.InvocationHelper;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl.class */
public class LocalPublisherManagerImpl<K, V> implements LocalPublisherManager<K, V> {
    static final int PARALLEL_BATCH_SIZE = 1024;

    @Inject
    ComponentRef<Cache<K, V>> cacheComponentRef;

    @Inject
    DistributionManager distributionManager;

    @Inject
    PersistenceManager persistenceManager;

    @Inject
    Configuration configuration;

    @Inject
    KeyPartitioner keyPartitioner;

    @Inject
    ComponentRef<InvocationHelper> invocationHelper;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    InvocationContextFactory invocationContextFactory;
    protected AdvancedCache<K, V> remoteCache;
    protected AdvancedCache<K, V> cache;
    protected Scheduler nonBlockingScheduler;
    protected int maxSegment;
    private volatile LocalPublisherManagerImpl<K, V>.LocalEntryPublisherStrategy localPublisherStrategy;
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static Function<Object, PublisherResult<Object>> ignoreSegmentsFunction = obj -> {
        return new SegmentPublisherResult(IntSets.immutableEmptySet(), obj);
    };
    protected final int cpuCount = ProcessorInfo.availableProcessors();
    protected final Set<IntConsumer> changeListener = ConcurrentHashMap.newKeySet();
    private final LocalPublisherManagerImpl<K, V>.LocalEntryPublisherStrategy nonSegmentedPublisher = new NonSegmentedEntryPublisherStrategy();
    private final LocalPublisherManagerImpl<K, V>.LocalEntryPublisherStrategy segmentedPublisher = new SegmentedLocalPublisherStrategyLocal();
    private final PersistenceManager.StoreChangeListener storeChangeListener = persistenceStatus -> {
        updateStrategy(persistenceStatus.usingSegmentedStore());
    };

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$BaseSegmentAwarePublisherSupplier.class */
    private static abstract class BaseSegmentAwarePublisherSupplier<R> implements SegmentAwarePublisherSupplier<R> {
        private BaseSegmentAwarePublisherSupplier() {
        }

        @Override // org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier
        public Publisher<SegmentPublisherSupplier.Notification<R>> publisherWithSegments() {
            return flowableWithNotifications(false).filter(notificationWithLost -> {
                return !notificationWithLost.isLostSegment();
            }).map(notificationWithLost2 -> {
                return notificationWithLost2;
            });
        }

        @Override // org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier
        public Publisher<SegmentAwarePublisherSupplier.NotificationWithLost<R>> publisherWithLostSegments(boolean z) {
            return flowableWithNotifications(z);
        }

        abstract Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$LocalEntryPublisherStrategy.class */
    public abstract class LocalEntryPublisherStrategy {
        LocalEntryPublisherStrategy() {
        }

        abstract <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2);

        abstract <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2);

        abstract <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener);
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$NonSegmentedEntryPublisherStrategy.class */
    class NonSegmentedEntryPublisherStrategy extends LocalPublisherManagerImpl<K, V>.LocalEntryPublisherStrategy {
        NonSegmentedEntryPublisherStrategy() {
            super();
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
            Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
            if (set != null) {
                fromPublisher = fromPublisher.filter(obj -> {
                    return !set.contains(function.apply(obj));
                });
            }
            return fromPublisher.buffer(LocalPublisherManagerImpl.PARALLEL_BATCH_SIZE).parallel(LocalPublisherManagerImpl.this.cpuCount).runOn(LocalPublisherManagerImpl.this.nonBlockingScheduler).flatMap(list -> {
                return Flowable.fromCompletionStage((CompletionStage) function2.apply(Flowable.fromIterable(list)));
            }, false, LocalPublisherManagerImpl.this.cpuCount).sequential();
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
            Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
            if (set != null) {
                fromPublisher = fromPublisher.filter(obj -> {
                    return !set.contains(function.apply(obj));
                });
            }
            return Flowable.fromCompletionStage(function2.apply(fromPublisher));
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
            return completionStage.thenApply(obj -> {
                return segmentListener.segmentsLost.isEmpty() ? (PublisherResult) LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(obj) : new SegmentPublisherResult(segmentListener.segments, null);
            }).whenComplete((publisherResult, th) -> {
                LocalPublisherManagerImpl.this.changeListener.remove(segmentListener);
            });
        }
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$RemoveSegmentListener.class */
    class RemoveSegmentListener implements IntConsumer {
        private final IntSet segments;

        RemoveSegmentListener(IntSet intSet) {
            this.segments = intSet;
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            if (this.segments.remove(i) && LocalPublisherManagerImpl.log.isTraceEnabled()) {
                LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d", this, Integer.valueOf(i));
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt it = this.segments.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (!localizedCacheTopology.isSegmentReadOwner(nextInt)) {
                    if (LocalPublisherManagerImpl.log.isTraceEnabled()) {
                        LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d before invocation", this, Integer.valueOf(nextInt));
                    }
                    it.remove();
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$SegmentAwarePublisherSupplierImpl.class */
    private class SegmentAwarePublisherSupplierImpl<I, R> extends BaseSegmentAwarePublisherSupplier<R> {
        private final IntSet segments;
        private final CacheSet<I> cacheSet;
        private final Predicate<? super I> predicate;
        private final DeliveryGuarantee deliveryGuarantee;
        private final Function<? super Publisher<I>, ? extends Publisher<R>> transformer;

        private SegmentAwarePublisherSupplierImpl(IntSet intSet, CacheSet<I> cacheSet, Function<? super I, K> function, Set<K> set, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<I>, ? extends Publisher<R>> function2) {
            this.segments = intSet;
            this.cacheSet = cacheSet;
            this.predicate = set != null ? obj -> {
                return !set.contains(function.apply(obj));
            } : null;
            this.deliveryGuarantee = deliveryGuarantee;
            this.transformer = function2;
        }

        @Override // org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier, org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier
        public Publisher<R> publisherWithoutSegments() {
            return Flowable.fromIterable(this.segments).concatMap(num -> {
                Publisher localPublisher = this.cacheSet.localPublisher(num.intValue());
                if (this.predicate != null) {
                    localPublisher = Flowable.fromPublisher(localPublisher).filter(this.predicate);
                }
                return this.transformer.apply(localPublisher);
            });
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.BaseSegmentAwarePublisherSupplier
        Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean z) {
            switch (this.deliveryGuarantee) {
                case AT_MOST_ONCE:
                    Notifications.NotificationBuilder reuseBuilder = z ? Notifications.reuseBuilder() : Notifications.newBuilder();
                    return Flowable.fromIterable(this.segments).concatMap(num -> {
                        Flowable fromPublisher = Flowable.fromPublisher(this.cacheSet.localPublisher(num.intValue()));
                        if (this.predicate != null) {
                            fromPublisher = fromPublisher.filter(this.predicate);
                        }
                        Function<? super Publisher<I>, ? extends Publisher<R>> function = this.transformer;
                        Objects.requireNonNull(function);
                        return fromPublisher.compose((v1) -> {
                            return r1.apply(v1);
                        }).map(obj -> {
                            return reuseBuilder.value(obj, num.intValue());
                        }).concatWith(Single.fromSupplier(() -> {
                            return reuseBuilder.segmentComplete(num.intValue());
                        }));
                    });
                case AT_LEAST_ONCE:
                case EXACTLY_ONCE:
                    return Flowable.defer(() -> {
                        Notifications.NotificationBuilder reuseBuilder2 = z ? Notifications.reuseBuilder() : Notifications.newBuilder();
                        IntSet concurrentCopyFrom = IntSets.concurrentCopyFrom(this.segments, LocalPublisherManagerImpl.this.maxSegment);
                        RemoveSegmentListener removeSegmentListener = new RemoveSegmentListener(concurrentCopyFrom);
                        LocalPublisherManagerImpl.this.changeListener.add(removeSegmentListener);
                        removeSegmentListener.verifyTopology(LocalPublisherManagerImpl.this.distributionManager.getCacheTopology());
                        return Flowable.fromIterable(this.segments).concatMap(num2 -> {
                            if (!concurrentCopyFrom.contains(num2)) {
                                return Flowable.just(reuseBuilder2.segmentLost(num2.intValue()));
                            }
                            Flowable fromPublisher = Flowable.fromPublisher(this.cacheSet.localPublisher(num2.intValue()));
                            if (this.predicate != null) {
                                fromPublisher = fromPublisher.filter(this.predicate);
                            }
                            Function<? super Publisher<I>, ? extends Publisher<R>> function = this.transformer;
                            Objects.requireNonNull(function);
                            return fromPublisher.compose((v1) -> {
                                return r1.apply(v1);
                            }).map(obj -> {
                                return reuseBuilder2.value(obj, num2.intValue());
                            }).concatWith(Single.fromSupplier(() -> {
                                return concurrentCopyFrom.remove(num2) ? reuseBuilder2.segmentComplete(num2.intValue()) : reuseBuilder2.segmentLost(num2.intValue());
                            }));
                        }).doFinally(() -> {
                            LocalPublisherManagerImpl.this.changeListener.remove(removeSegmentListener);
                        });
                    });
                default:
                    throw new UnsupportedOperationException("Unsupported delivery guarantee: " + this.deliveryGuarantee);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$SegmentListener.class */
    public class SegmentListener implements IntConsumer {
        protected final IntSet segments;
        protected final IntSet segmentsLost;

        SegmentListener(IntSet intSet) {
            this.segments = intSet;
            this.segmentsLost = IntSets.concurrentSet(LocalPublisherManagerImpl.this.maxSegment);
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            if (this.segments.contains(i)) {
                if (LocalPublisherManagerImpl.log.isTraceEnabled()) {
                    LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d", this, Integer.valueOf(i));
                }
                this.segmentsLost.set(i);
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt it = this.segments.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (!localizedCacheTopology.isSegmentReadOwner(nextInt)) {
                    this.segmentsLost.set(nextInt);
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$SegmentedLocalPublisherStrategyLocal.class */
    class SegmentedLocalPublisherStrategyLocal extends LocalPublisherManagerImpl<K, V>.LocalEntryPublisherStrategy {
        SegmentedLocalPublisherStrategyLocal() {
            super();
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        public <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
            int i = LocalPublisherManagerImpl.this.cpuCount - 1;
            Flowable[] flowableArr = new Flowable[i + 1];
            PrimitiveIterator.OfInt it = intSet.iterator();
            for (int i2 = 0; i2 < i; i2++) {
                int nextSegment = LocalPublisherManagerImpl.this.getNextSegment(it);
                if (nextSegment == -1) {
                    flowableArr[i2] = Flowable.empty();
                } else {
                    UnicastProcessor create = UnicastProcessor.create();
                    flowableArr[i2] = create;
                    LocalPublisherManagerImpl.this.nonBlockingScheduler.scheduleDirect(() -> {
                        LocalPublisherManagerImpl.this.handleParallelSegment(it, nextSegment, cacheSet, set, function, function2, create, intSet2, segmentListener);
                    });
                }
            }
            int nextSegment2 = LocalPublisherManagerImpl.this.getNextSegment(it);
            if (nextSegment2 != -1) {
                FlowableProcessor<R> create2 = UnicastProcessor.create();
                flowableArr[i] = create2;
                LocalPublisherManagerImpl.this.handleParallelSegment(it, nextSegment2, cacheSet, set, function, function2, create2, intSet2, segmentListener);
            } else {
                flowableArr[i] = Flowable.empty();
            }
            return ParallelFlowable.fromArray(flowableArr).sequential();
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        public <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
            return Flowable.fromIterable(intSet).concatMapMaybe(num -> {
                Flowable doOnComplete = Flowable.fromPublisher(cacheSet.localPublisher(num.intValue())).doOnComplete(() -> {
                    intSet2.remove(num);
                });
                if (set != null) {
                    doOnComplete = doOnComplete.filter(obj -> {
                        return !set.contains(function.apply(obj));
                    });
                }
                CompletionStage completionStage = (CompletionStage) function2.apply(doOnComplete);
                return CompletionStages.isCompletedSuccessfully(completionStage) ? segmentListener.segmentsLost.contains(num) ? Maybe.empty() : Maybe.fromCompletionStage(completionStage) : Maybe.fromCompletionStage(completionStage.thenCompose(obj2 -> {
                    return segmentListener.segmentsLost.contains(num) ? CompletableFutures.completedNull() : CompletableFuture.completedFuture(obj2);
                }));
            });
        }

        @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.LocalEntryPublisherStrategy
        public <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
            return LocalPublisherManagerImpl.this.handleLostSegments(completionStage, segmentListener);
        }
    }

    @Inject
    public void inject(@ComponentName("org.infinispan.executors.non-blocking") ExecutorService executorService) {
        this.nonBlockingScheduler = Schedulers.from(executorService);
    }

    @Start
    public void start() {
        this.remoteCache = AbstractDelegatingCache.unwrapCache(this.cacheComponentRef.running()).getAdvancedCache();
        this.cache = this.remoteCache.withFlags(Flag.CACHE_MODE_LOCAL);
        this.maxSegment = this.cache.getCacheConfiguration().clustering().hash().numSegments();
        updateStrategy(this.configuration.persistence().usingSegmentedStore());
        this.persistenceManager.addStoreListener(this.storeChangeListener);
    }

    @Stop
    public void stop() {
        this.persistenceManager.removeStoreListener(this.storeChangeListener);
    }

    private void updateStrategy(boolean z) {
        if (!this.configuration.persistence().usingStores() || z) {
            this.localPublisherStrategy = this.segmentedPublisher;
        } else {
            this.localPublisherStrategy = this.nonSegmentedPublisher;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> CompletionStage<PublisherResult<R>> keyReduction(boolean z, IntSet intSet, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        if (set != null) {
            return handleSpecificKeys(z, set, set2, j, deliveryGuarantee, function, function2);
        }
        CacheSet keySet = getKeySet(j);
        Function<I, K> identity = Function.identity();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE:
                return (CompletionStage<PublisherResult<R>>) atMostOnce(z, keySet, set2, identity, intSet, function, function2).thenApply(ignoreSegmentsFunction());
            case AT_LEAST_ONCE:
                return atLeastOnce(z, keySet, set2, identity, intSet, function, function2);
            case EXACTLY_ONCE:
                return exactlyOnce(z, keySet, set2, identity, intSet, function, function2);
            default:
                throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> CompletionStage<PublisherResult<R>> entryReduction(boolean z, IntSet intSet, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        if (set != null) {
            return handleSpecificEntries(z, set, set2, j, deliveryGuarantee, function, function2);
        }
        CacheSet entrySet = getEntrySet(j);
        Function<I, K> entryToKeyFunction = StreamMarshalling.entryToKeyFunction();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE:
                return (CompletionStage<PublisherResult<R>>) atMostOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2).thenApply(ignoreSegmentsFunction());
            case AT_LEAST_ONCE:
                return atLeastOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2);
            case EXACTLY_ONCE:
                return exactlyOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2);
            default:
                throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> SegmentAwarePublisherSupplier<R> keyPublisher(IntSet intSet, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends Publisher<R>> function) {
        if (set == null) {
            return new SegmentAwarePublisherSupplierImpl(intSet, getKeySet(j), Function.identity(), set2, deliveryGuarantee, function);
        }
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, j);
        return specificKeyPublisher(intSet, set, flowable -> {
            Objects.requireNonNull(cache);
            return flowable.filter(cache::containsKey);
        }, function);
    }

    private Flowable<CacheEntry<K, V>> filterEntries(AdvancedCache<K, V> advancedCache, Flowable<K> flowable) {
        return flowable.concatMapMaybe(obj -> {
            return Maybe.fromCompletionStage(advancedCache.getCacheEntryAsync(obj).thenApply(cacheEntry -> {
                if (cacheEntry == null) {
                    return NullCacheEntry.getInstance();
                }
                if (cacheEntry instanceof MVCCEntry) {
                    cacheEntry = new ImmortalCacheEntry(cacheEntry.getKey(), cacheEntry.getValue());
                }
                return cacheEntry;
            }));
        }).filter(cacheEntry -> {
            return cacheEntry != NullCacheEntry.getInstance();
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> SegmentAwarePublisherSupplier<R> entryPublisher(IntSet intSet, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> function) {
        if (set == null) {
            return new SegmentAwarePublisherSupplierImpl(intSet, getEntrySet(j), StreamMarshalling.entryToKeyFunction(), set2, deliveryGuarantee, function);
        }
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, j);
        return specificKeyPublisher(intSet, set, flowable -> {
            return filterEntries(cache, flowable);
        }, function);
    }

    private <I, R> SegmentAwarePublisherSupplier<R> specificKeyPublisher(final IntSet intSet, final Set<K> set, final FlowableConverter<K, Flowable<I>> flowableConverter, final Function<? super Publisher<I>, ? extends Publisher<R>> function) {
        return new BaseSegmentAwarePublisherSupplier<R>() { // from class: org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.1
            @Override // org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier, org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier
            public Publisher<R> publisherWithoutSegments() {
                Flowable flowable = (Flowable) Flowable.fromIterable(set).to(flowableConverter);
                Function function2 = function;
                Objects.requireNonNull(function2);
                return (Publisher) flowable.to((v1) -> {
                    return r1.apply(v1);
                });
            }

            @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.BaseSegmentAwarePublisherSupplier
            Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean z) {
                Flowable fromIterable = Flowable.fromIterable(set);
                KeyPartitioner keyPartitioner = LocalPublisherManagerImpl.this.keyPartitioner;
                Objects.requireNonNull(keyPartitioner);
                Flowable groupBy = fromIterable.groupBy(keyPartitioner::getSegment);
                IntSet intSet2 = intSet;
                FlowableConverter flowableConverter2 = flowableConverter;
                Function function2 = function;
                return groupBy.concatMapEager(groupedFlowable -> {
                    int intValue = ((Integer) groupedFlowable.getKey()).intValue();
                    if (!intSet2.remove(intValue)) {
                        throw new IllegalArgumentException("Key: " + LocalPublisherManagerImpl.blockingFirst(groupedFlowable) + " maps to segment: " + intValue + ", which was not included in segments provided: " + intSet2);
                    }
                    Flowable flowable = (Flowable) flowableConverter2.apply(groupedFlowable);
                    Objects.requireNonNull(function2);
                    return Flowable.fromPublisher((Publisher) flowable.to((v1) -> {
                        return r1.apply(v1);
                    })).map(obj -> {
                        return Notifications.value(obj, intValue);
                    }).concatWith(Single.just(Notifications.segmentComplete(intValue)));
                }, intSet.size(), Math.min(set.size(), Flowable.bufferSize())).concatWith(Flowable.fromIterable(intSet).map((v0) -> {
                    return Notifications.segmentComplete(v0);
                }));
            }
        };
    }

    static Object blockingFirst(Flowable<?> flowable) {
        return flowable.blockingFirst();
    }

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public void segmentsLost(IntSet intSet) {
        if (log.isTraceEnabled()) {
            log.tracef("Notifying listeners of lost segments %s", intSet);
        }
        Set<IntConsumer> set = this.changeListener;
        Objects.requireNonNull(intSet);
        set.forEach(intSet::forEach);
    }

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public CompletionStage<Long> sizePublisher(IntSet intSet, long j) {
        SizeCommand buildSizeCommand = this.commandsFactory.buildSizeCommand(intSet, EnumUtil.mergeBitSets(j, FlagBitSets.CACHE_MODE_LOCAL));
        return CompletableFuture.completedFuture((Long) this.invocationHelper.running().invoke(this.invocationContextFactory.createInvocationContext(false, -1), buildSizeCommand));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <R> Function<R, PublisherResult<R>> ignoreSegmentsFunction() {
        return (Function<R, PublisherResult<R>>) ignoreSegmentsFunction;
    }

    private <I, R> void handleParallelSegment(PrimitiveIterator.OfInt ofInt, int i, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, FlowableProcessor<R> flowableProcessor, IntSet intSet, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        int nextSegment;
        while (true) {
            if (i != -1) {
                nextSegment = i;
                i = -1;
            } else {
                nextSegment = getNextSegment(ofInt);
                if (nextSegment == -1) {
                    flowableProcessor.onComplete();
                    return;
                }
            }
            try {
                Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(nextSegment));
                if (set != null) {
                    fromPublisher = fromPublisher.filter(obj -> {
                        return !set.contains(function.apply(obj));
                    });
                }
                CompletionStage<R> apply = function2.apply(fromPublisher);
                if (!CompletionStages.isCompletedSuccessfully(apply)) {
                    int i2 = nextSegment;
                    apply.whenComplete((obj2, th) -> {
                        if (th != null) {
                            flowableProcessor.onError(th);
                            return;
                        }
                        intSet.remove(i2);
                        if (!segmentListener.segmentsLost.contains(i2) && obj2 != null) {
                            flowableProcessor.onNext(obj2);
                        }
                        handleParallelSegment(ofInt, -1, cacheSet, set, function, function2, flowableProcessor, intSet, segmentListener);
                    });
                    return;
                } else {
                    intSet.remove(nextSegment);
                    if (!segmentListener.segmentsLost.contains(nextSegment)) {
                        Object join = CompletionStages.join(apply);
                        if (join != null) {
                            flowableProcessor.onNext(join);
                        }
                    }
                }
            } catch (Throwable th2) {
                flowableProcessor.onError(th2);
                return;
            }
        }
    }

    private int getNextSegment(PrimitiveIterator.OfInt ofInt) {
        synchronized (ofInt) {
            if (!ofInt.hasNext()) {
                return -1;
            }
            return ofInt.nextInt();
        }
    }

    private <I, R> CompletionStage<PublisherResult<R>> exactlyOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        IntSet concurrentCopyFrom = IntSets.concurrentCopyFrom(intSet, this.maxSegment);
        LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(concurrentCopyFrom);
        this.changeListener.add(segmentListener);
        segmentListener.verifyTopology(this.distributionManager.getCacheTopology());
        return exactlyOnceHandleLostSegments(function3.apply(z ? exactlyOnceParallel(cacheSet, set, function, intSet, function2, segmentListener, concurrentCopyFrom) : exactlyOnceSequential(cacheSet, set, function, intSet, function2, segmentListener, concurrentCopyFrom)), segmentListener);
    }

    protected <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        return this.localPublisherStrategy.exactlyOnceHandleLostSegments(completionStage, segmentListener);
    }

    protected <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        return this.localPublisherStrategy.exactlyOnceParallel(cacheSet, set, function, intSet, function2, segmentListener, intSet2);
    }

    protected <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        return this.localPublisherStrategy.exactlyOnceSequential(cacheSet, set, function, intSet, function2, segmentListener, intSet2);
    }

    private AdvancedCache<K, V> getCache(DeliveryGuarantee deliveryGuarantee, long j) {
        AdvancedCache<K, V> advancedCache = deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE ? this.cache : this.remoteCache;
        return j != 0 ? advancedCache.withFlags(EnumUtil.enumSetOf(j, Flag.class)) : advancedCache;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> CompletionStage<PublisherResult<R>> handleSpecificKeys(boolean z, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, j);
        return handleSpecificObjects(z, set, set2, flowable -> {
            return flowable.concatMapMaybe(obj -> {
                return Maybe.fromCompletionStage(cache.containsKeyAsync(obj).thenApply(bool -> {
                    if (bool.booleanValue()) {
                        return obj;
                    }
                    return null;
                }));
            });
        }, function, function2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> CompletionStage<PublisherResult<R>> handleSpecificEntries(boolean z, Set<K> set, Set<K> set2, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, j);
        return handleSpecificObjects(z, set, set2, flowable -> {
            return flowable.concatMapMaybe(obj -> {
                return Maybe.fromCompletionStage(cache.getCacheEntryAsync(obj).thenApply(cacheEntry -> {
                    if (cacheEntry instanceof MVCCEntry) {
                        cacheEntry = new ImmortalCacheEntry(cacheEntry.getKey(), cacheEntry.getValue());
                    }
                    return cacheEntry;
                }));
            }).filter(cacheEntry -> {
                return cacheEntry != NullCacheEntry.getInstance();
            });
        }, function, function2);
    }

    private <I, R> CompletionStage<PublisherResult<R>> handleSpecificObjects(boolean z, Set<K> set, Set<K> set2, Function<? super Flowable<K>, ? extends Flowable<I>> function, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        Flowable fromIterable = Flowable.fromIterable(set);
        if (set2 != null) {
            fromIterable = fromIterable.filter(obj -> {
                return !set2.contains(obj);
            });
        }
        if (z) {
            return (CompletionStage<PublisherResult<R>>) function3.apply(fromIterable.window(16L).flatMapMaybe(flowable -> {
                Flowable flowable = (Flowable) function.apply(flowable.observeOn(this.nonBlockingScheduler));
                Objects.requireNonNull(function2);
                return Maybe.fromCompletionStage((CompletionStage) flowable.to((v1) -> {
                    return r1.apply(v1);
                }));
            })).thenApply(ignoreSegmentsFunction());
        }
        Flowable<I> apply = function.apply(fromIterable);
        Objects.requireNonNull(function2);
        return ((CompletionStage) apply.to((v1) -> {
            return r1.apply(v1);
        })).thenApply(ignoreSegmentsFunction());
    }

    private <I, R> CompletionStage<R> parallelAtMostOnce(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        return function3.apply(Flowable.fromIterable(intSet).parallel(this.cpuCount).runOn(this.nonBlockingScheduler).concatMap(num -> {
            Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(num.intValue()));
            if (set != null) {
                fromPublisher = fromPublisher.filter(obj -> {
                    return !set.contains(function.apply(obj));
                });
            }
            return Maybe.fromCompletionStage((CompletionStage) function2.apply(fromPublisher)).toFlowable();
        }).sequential());
    }

    private <I, R> CompletionStage<R> atMostOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        if (z) {
            return parallelAtMostOnce(cacheSet, set, function, intSet, function2, function3);
        }
        Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
        if (set != null) {
            fromPublisher = fromPublisher.filter(obj -> {
                return !set.contains(function.apply(obj));
            });
        }
        return function2.apply(fromPublisher);
    }

    private <I, R> CompletionStage<PublisherResult<R>> atLeastOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(intSet);
        this.changeListener.add(segmentListener);
        segmentListener.verifyTopology(this.distributionManager.getCacheTopology());
        return handleLostSegments(atMostOnce(z, cacheSet, set, function, intSet, function2, function3), segmentListener);
    }

    protected <R> CompletionStage<PublisherResult<R>> handleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        return completionStage.thenApply(obj -> {
            IntSet intSet = segmentListener.segmentsLost;
            return intSet.isEmpty() ? (PublisherResult) ignoreSegmentsFunction().apply(obj) : new SegmentPublisherResult(intSet, obj);
        }).whenComplete((publisherResult, th) -> {
            this.changeListener.remove(segmentListener);
        });
    }

    private CacheSet<K> getKeySet(long j) {
        KeySetCommand buildKeySetCommand = this.commandsFactory.buildKeySetCommand(j);
        return (CacheSet) this.invocationHelper.running().invoke(this.invocationContextFactory.createInvocationContext(false, -1), buildKeySetCommand);
    }

    private CacheSet<CacheEntry<K, V>> getEntrySet(long j) {
        EntrySetCommand buildEntrySetCommand = this.commandsFactory.buildEntrySetCommand(j);
        return (CacheSet) this.invocationHelper.running().invoke(this.invocationContextFactory.createInvocationContext(false, -1), buildEntrySetCommand);
    }
}
