/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableConverter;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.IntConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.cache.impl.InvocationHelper;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Scope(value=Scopes.NAMED_CACHE)
public class LocalPublisherManagerImpl<K, V>
implements LocalPublisherManager<K, V> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    static final int PARALLEL_BATCH_SIZE = 1024;
    @Inject
    ComponentRef<Cache<K, V>> cacheComponentRef;
    @Inject
    DistributionManager distributionManager;
    @Inject
    PersistenceManager persistenceManager;
    @Inject
    Configuration configuration;
    @Inject
    KeyPartitioner keyPartitioner;
    @Inject
    ComponentRef<InvocationHelper> invocationHelper;
    @Inject
    CommandsFactory commandsFactory;
    @Inject
    InvocationContextFactory invocationContextFactory;
    protected AdvancedCache<K, V> remoteCache;
    protected AdvancedCache<K, V> cache;
    protected Scheduler nonBlockingScheduler;
    protected int maxSegment;
    protected final int cpuCount = ProcessorInfo.availableProcessors();
    protected final Set<IntConsumer> changeListener = ConcurrentHashMap.newKeySet();
    private final LocalEntryPublisherStrategy nonSegmentedPublisher = new NonSegmentedEntryPublisherStrategy();
    private final LocalEntryPublisherStrategy segmentedPublisher = new SegmentedLocalPublisherStrategyLocal();
    private volatile LocalEntryPublisherStrategy localPublisherStrategy;
    private final PersistenceManager.StoreChangeListener storeChangeListener = pm -> this.updateStrategy(pm.usingSegmentedStore());
    private static final Function<Object, PublisherResult<Object>> ignoreSegmentsFunction = value -> new SegmentPublisherResult<Object>(IntSets.immutableEmptySet(), value);

    @Inject
    public void inject(@ComponentName(value="org.infinispan.executors.non-blocking") ExecutorService nonBlockingExecutor) {
        this.nonBlockingScheduler = Schedulers.from((Executor)nonBlockingExecutor);
    }

    @Start
    public void start() {
        this.remoteCache = AbstractDelegatingCache.unwrapCache(this.cacheComponentRef.running()).getAdvancedCache();
        this.cache = this.remoteCache.withFlags(Flag.CACHE_MODE_LOCAL);
        ClusteringConfiguration clusteringConfiguration = this.cache.getCacheConfiguration().clustering();
        this.maxSegment = clusteringConfiguration.hash().numSegments();
        this.updateStrategy(this.configuration.persistence().usingSegmentedStore());
        this.persistenceManager.addStoreListener(this.storeChangeListener);
    }

    @Stop
    public void stop() {
        this.persistenceManager.removeStoreListener(this.storeChangeListener);
    }

    private void updateStrategy(boolean usingSegmentedStored) {
        this.localPublisherStrategy = this.configuration.persistence().usingStores() && !usingSegmentedStored ? this.nonSegmentedPublisher : this.segmentedPublisher;
    }

    @Override
    public <R> CompletionStage<PublisherResult<R>> keyReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (keysToInclude != null) {
            return this.handleSpecificKeys(parallelPublisher, keysToInclude, keysToExclude, explicitFlags, deliveryGuarantee, collator, finalizer);
        }
        CacheSet<K> keySet = this.getKeySet(explicitFlags);
        Function toKeyFunction = Function.identity();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE: {
                CompletionStage<R> stage = this.atMostOnce(parallelPublisher, keySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
                return stage.thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
            }
            case AT_LEAST_ONCE: {
                return this.atLeastOnce(parallelPublisher, keySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
            }
            case EXACTLY_ONCE: {
                return this.exactlyOnce(parallelPublisher, keySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
            }
        }
        throw new UnsupportedOperationException("Unsupported delivery guarantee: " + String.valueOf((Object)deliveryGuarantee));
    }

    @Override
    public <R> CompletionStage<PublisherResult<R>> entryReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (keysToInclude != null) {
            return this.handleSpecificEntries(parallelPublisher, keysToInclude, keysToExclude, explicitFlags, deliveryGuarantee, collator, finalizer);
        }
        CacheSet<CacheEntry<K, V>> entrySet = this.getEntrySet(explicitFlags);
        Function toKeyFunction = StreamMarshalling.entryToKeyFunction();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE: {
                CompletionStage<R> stage = this.atMostOnce(parallelPublisher, entrySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
                return stage.thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
            }
            case AT_LEAST_ONCE: {
                return this.atLeastOnce(parallelPublisher, entrySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
            }
            case EXACTLY_ONCE: {
                return this.exactlyOnce(parallelPublisher, entrySet, keysToExclude, toKeyFunction, segments, collator, finalizer);
            }
        }
        throw new UnsupportedOperationException("Unsupported delivery guarantee: " + String.valueOf((Object)deliveryGuarantee));
    }

    @Override
    public <R> SegmentAwarePublisherSupplier<R> keyPublisher(IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends Publisher<R>> transformer) {
        if (keysToInclude != null) {
            AdvancedCache cache = this.getCache(deliveryGuarantee, explicitFlags);
            return this.specificKeyPublisher(segments, keysToInclude, keyFlowable -> keyFlowable.filter(arg_0 -> cache.containsKey(arg_0)), transformer);
        }
        return new SegmentAwarePublisherSupplierImpl(segments, this.getKeySet(explicitFlags), Function.identity(), keysToExclude, deliveryGuarantee, transformer);
    }

    private Flowable<CacheEntry<K, V>> filterEntries(AdvancedCache<K, V> cacheToUse, Flowable<K> entryFlowable) {
        return entryFlowable.concatMapMaybe(k -> {
            CompletionStage future = cacheToUse.getCacheEntryAsync(k);
            future = future.thenApply(entry -> {
                if (entry == null) {
                    return NullCacheEntry.getInstance();
                }
                if (entry instanceof MVCCEntry) {
                    entry = new ImmortalCacheEntry(entry.getKey(), entry.getValue());
                }
                return entry;
            });
            return Maybe.fromCompletionStage(future);
        }).filter(e -> e != NullCacheEntry.getInstance());
    }

    @Override
    public <R> SegmentAwarePublisherSupplier<R> entryPublisher(IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> transformer) {
        if (keysToInclude != null) {
            AdvancedCache<K, V> cacheToUse = this.getCache(deliveryGuarantee, explicitFlags);
            return this.specificKeyPublisher(segments, keysToInclude, entryFlowable -> this.filterEntries(cacheToUse, entryFlowable), transformer);
        }
        return new SegmentAwarePublisherSupplierImpl(segments, this.getEntrySet(explicitFlags), StreamMarshalling.entryToKeyFunction(), keysToExclude, deliveryGuarantee, transformer);
    }

    private <I, R> SegmentAwarePublisherSupplier<R> specificKeyPublisher(final IntSet segments, final Set<K> keysToInclude, final FlowableConverter<K, Flowable<I>> conversionFunction, final Function<? super Publisher<I>, ? extends Publisher<R>> transformer) {
        return new BaseSegmentAwarePublisherSupplier<R>(){

            @Override
            public Publisher<R> publisherWithoutSegments() {
                return (Publisher)((Flowable)Flowable.fromIterable((Iterable)keysToInclude).to(conversionFunction)).to(transformer::apply);
            }

            @Override
            Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean reuseNotifications) {
                return Flowable.fromIterable((Iterable)keysToInclude).groupBy(LocalPublisherManagerImpl.this.keyPartitioner::getSegment).concatMapEager(group -> {
                    int segment = (Integer)group.getKey();
                    if (!segments.remove(segment)) {
                        throw new IllegalArgumentException("Key: " + String.valueOf(LocalPublisherManagerImpl.blockingFirst(group)) + " maps to segment: " + segment + ", which was not included in segments provided: " + String.valueOf(segments));
                    }
                    return Flowable.fromPublisher((Publisher)((Publisher)((Flowable)conversionFunction.apply((Flowable)group)).to(transformer::apply))).map(r -> Notifications.value(r, segment)).concatWith((SingleSource)Single.just(Notifications.segmentComplete(segment)));
                }, segments.size(), Math.min(keysToInclude.size(), Flowable.bufferSize())).concatWith((Publisher)Flowable.fromIterable((Iterable)segments).map(Notifications::segmentComplete));
            }
        };
    }

    static Object blockingFirst(Flowable<?> flowable) {
        return flowable.blockingFirst();
    }

    @Override
    public void segmentsLost(IntSet lostSegments) {
        if (log.isTraceEnabled()) {
            log.tracef("Notifying listeners of lost segments %s", lostSegments);
        }
        this.changeListener.forEach(arg_0 -> ((IntSet)lostSegments).forEach(arg_0));
    }

    @Override
    public CompletionStage<Long> sizePublisher(IntSet segments, long flags) {
        SizeCommand command = this.commandsFactory.buildSizeCommand(segments, EnumUtil.mergeBitSets((long)flags, (long)FlagBitSets.CACHE_MODE_LOCAL));
        InvocationContext ctx = this.invocationContextFactory.createInvocationContext(false, -1);
        return CompletableFuture.completedFuture((Long)this.invocationHelper.running().invoke(ctx, command));
    }

    static <R> Function<R, PublisherResult<R>> ignoreSegmentsFunction() {
        return ignoreSegmentsFunction;
    }

    private <I, R> void handleParallelSegment(PrimitiveIterator.OfInt segmentIter, int initialSegment, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, FlowableProcessor<R> processor, IntSet concurrentSegments, SegmentListener listener) {
        try {
            block6: {
                CompletionStage<R> stage;
                int nextSegment;
                while (true) {
                    R result;
                    if (initialSegment != -1) {
                        nextSegment = initialSegment;
                        initialSegment = -1;
                    } else {
                        nextSegment = this.getNextSegment(segmentIter);
                        if (nextSegment == -1) break block6;
                    }
                    Flowable innerFlowable = Flowable.fromPublisher(set.localPublisher(nextSegment));
                    if (keysToExclude != null) {
                        innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
                    }
                    if (!CompletionStages.isCompletedSuccessfully(stage = collator.apply((Publisher<I>)innerFlowable))) break;
                    concurrentSegments.remove(nextSegment);
                    if (listener.segmentsLost.contains(nextSegment) || (result = CompletionStages.join(stage)) == null) continue;
                    processor.onNext(result);
                }
                FlowableProcessor processorToUse = processor;
                stage.whenComplete((value, t) -> {
                    if (t != null) {
                        processorToUse.onError(t);
                    } else {
                        concurrentSegments.remove(nextSegment);
                        if (!listener.segmentsLost.contains(nextSegment) && value != null) {
                            processor.onNext(value);
                        }
                        this.handleParallelSegment(segmentIter, -1, set, keysToExclude, toKeyFunction, collator, processor, concurrentSegments, listener);
                    }
                });
                return;
            }
            processor.onComplete();
        }
        catch (Throwable t2) {
            processor.onError(t2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getNextSegment(PrimitiveIterator.OfInt segmentIter) {
        PrimitiveIterator.OfInt ofInt = segmentIter;
        synchronized (ofInt) {
            if (segmentIter.hasNext()) {
                return segmentIter.nextInt();
            }
            return -1;
        }
    }

    private <I, R> CompletionStage<PublisherResult<R>> exactlyOnce(boolean parallelPublisher, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        IntSet concurrentSegments = IntSets.concurrentCopyFrom((IntSet)segments, (int)this.maxSegment);
        SegmentListener listener = new SegmentListener(concurrentSegments);
        this.changeListener.add(listener);
        listener.verifyTopology(this.distributionManager.getCacheTopology());
        Flowable<R> resultFlowable = parallelPublisher ? this.exactlyOnceParallel(set, keysToExclude, toKeyFunction, segments, collator, listener, concurrentSegments) : this.exactlyOnceSequential(set, keysToExclude, toKeyFunction, segments, collator, listener, concurrentSegments);
        return this.exactlyOnceHandleLostSegments(finalizer.apply((Publisher<R>)resultFlowable), listener);
    }

    protected <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> finalValue, SegmentListener listener) {
        return this.localPublisherStrategy.exactlyOnceHandleLostSegments(finalValue, listener);
    }

    protected <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, SegmentListener listener, IntSet concurrentSegments) {
        return this.localPublisherStrategy.exactlyOnceParallel(set, keysToExclude, toKeyFunction, segments, collator, listener, concurrentSegments);
    }

    protected <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, SegmentListener listener, IntSet concurrentSegments) {
        return this.localPublisherStrategy.exactlyOnceSequential(set, keysToExclude, toKeyFunction, segments, collator, listener, concurrentSegments);
    }

    private AdvancedCache<K, V> getCache(DeliveryGuarantee deliveryGuarantee, long explicitFlags) {
        AdvancedCache<K, V> cache;
        AdvancedCache<K, V> advancedCache = cache = deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE ? this.cache : this.remoteCache;
        if (explicitFlags != 0L) {
            return cache.withFlags(EnumUtil.enumSetOf((long)explicitFlags, Flag.class));
        }
        return cache;
    }

    private <R> CompletionStage<PublisherResult<R>> handleSpecificKeys(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        AdvancedCache cache = this.getCache(deliveryGuarantee, explicitFlags);
        return this.handleSpecificObjects(parallelPublisher, keysToInclude, keysToExclude, keyFlowable -> keyFlowable.concatMapMaybe(key -> Maybe.fromCompletionStage((CompletionStage)cache.containsKeyAsync(key).thenApply(contains -> contains != false ? key : null))), collator, finalizer);
    }

    private <R> CompletionStage<PublisherResult<R>> handleSpecificEntries(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, long explicitFlags, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        AdvancedCache cache = this.getCache(deliveryGuarantee, explicitFlags);
        return this.handleSpecificObjects(parallelPublisher, keysToInclude, keysToExclude, keyFlowable -> keyFlowable.concatMapMaybe(k -> {
            CompletionStage future = cache.getCacheEntryAsync(k);
            future = future.thenApply(entry -> {
                if (entry instanceof MVCCEntry) {
                    entry = new ImmortalCacheEntry(entry.getKey(), entry.getValue());
                }
                return entry;
            });
            return Maybe.fromCompletionStage(future);
        }).filter(e -> e != NullCacheEntry.getInstance()), collator, finalizer);
    }

    private <I, R> CompletionStage<PublisherResult<R>> handleSpecificObjects(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, Function<? super Flowable<K>, ? extends Flowable<I>> keyTransformer, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Flowable keyFlowable = Flowable.fromIterable(keysToInclude);
        if (keysToExclude != null) {
            keyFlowable = keyFlowable.filter(k -> !keysToExclude.contains(k));
        }
        if (parallelPublisher) {
            Flowable stageFlowable = keyFlowable.window(16L).flatMapMaybe(keys -> {
                CompletionStage stage = (CompletionStage)((Flowable)keyTransformer.apply(keys.observeOn(this.nonBlockingScheduler))).to(collator::apply);
                return Maybe.fromCompletionStage((CompletionStage)stage);
            });
            return finalizer.apply((Publisher<R>)stageFlowable).thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
        }
        return ((CompletionStage)keyTransformer.apply(keyFlowable).to(collator::apply)).thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
    }

    private <I, R> CompletionStage<R> parallelAtMostOnce(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Flowable stageFlowable = Flowable.fromIterable((Iterable)segments).parallel(this.cpuCount).runOn(this.nonBlockingScheduler).concatMap(segment -> {
            Flowable innerFlowable = Flowable.fromPublisher(set.localPublisher((int)segment));
            if (keysToExclude != null) {
                innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            CompletionStage stage = (CompletionStage)collator.apply((Object)innerFlowable);
            return Maybe.fromCompletionStage((CompletionStage)stage).toFlowable();
        }).sequential();
        return finalizer.apply((Publisher<R>)stageFlowable);
    }

    private <I, R> CompletionStage<R> atMostOnce(boolean parallel, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (parallel) {
            return this.parallelAtMostOnce(set, keysToExclude, toKeyFunction, segments, collator, finalizer);
        }
        Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
        if (keysToExclude != null) {
            flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
        }
        return collator.apply((Publisher<I>)flowable);
    }

    private <I, R> CompletionStage<PublisherResult<R>> atLeastOnce(boolean parallel, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        SegmentListener listener = new SegmentListener(segments);
        this.changeListener.add(listener);
        listener.verifyTopology(this.distributionManager.getCacheTopology());
        CompletionStage<R> stage = this.atMostOnce(parallel, set, keysToExclude, toKeyFunction, segments, collator, finalizer);
        return this.handleLostSegments(stage, listener);
    }

    protected <R> CompletionStage<PublisherResult<R>> handleLostSegments(CompletionStage<R> stage, SegmentListener segmentListener) {
        return stage.thenApply(value -> {
            IntSet lostSegments = segmentListener.segmentsLost;
            if (lostSegments.isEmpty()) {
                return LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(value);
            }
            return new SegmentPublisherResult<Object>(lostSegments, value);
        }).whenComplete((u, t) -> this.changeListener.remove(segmentListener));
    }

    private CacheSet<K> getKeySet(long explicitFlags) {
        KeySetCommand command = this.commandsFactory.buildKeySetCommand(explicitFlags);
        InvocationContext ctx = this.invocationContextFactory.createInvocationContext(false, -1);
        return (CacheSet)this.invocationHelper.running().invoke(ctx, command);
    }

    private CacheSet<CacheEntry<K, V>> getEntrySet(long explicitFlags) {
        EntrySetCommand command = this.commandsFactory.buildEntrySetCommand(explicitFlags);
        InvocationContext ctx = this.invocationContextFactory.createInvocationContext(false, -1);
        return (CacheSet)this.invocationHelper.running().invoke(ctx, command);
    }

    class NonSegmentedEntryPublisherStrategy
    extends LocalEntryPublisherStrategy {
        NonSegmentedEntryPublisherStrategy() {
        }

        @Override
        <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, SegmentListener listener, IntSet concurrentSegments) {
            Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
            if (keysToExclude != null) {
                flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            return flowable.buffer(1024).parallel(LocalPublisherManagerImpl.this.cpuCount).runOn(LocalPublisherManagerImpl.this.nonBlockingScheduler).flatMap(buffer -> Flowable.fromCompletionStage((CompletionStage)((CompletionStage)transformer.apply((Object)Flowable.fromIterable((Iterable)buffer)))), false, LocalPublisherManagerImpl.this.cpuCount).sequential();
        }

        @Override
        <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, SegmentListener listener, IntSet concurrentSegments) {
            Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
            if (keysToExclude != null) {
                flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            return Flowable.fromCompletionStage(transformer.apply((Publisher<I>)flowable));
        }

        @Override
        <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> finalValue, SegmentListener listener) {
            return finalValue.thenApply(value -> {
                IntSet lostSegments = listener.segmentsLost;
                if (lostSegments.isEmpty()) {
                    return LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(value);
                }
                return new SegmentPublisherResult<Object>(listener.segments, null);
            }).whenComplete((u, t) -> LocalPublisherManagerImpl.this.changeListener.remove(listener));
        }
    }

    abstract class LocalEntryPublisherStrategy {
        LocalEntryPublisherStrategy() {
        }

        abstract <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> var1, Set<K> var2, Function<I, K> var3, IntSet var4, Function<? super Publisher<I>, ? extends CompletionStage<R>> var5, SegmentListener var6, IntSet var7);

        abstract <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> var1, Set<K> var2, Function<I, K> var3, IntSet var4, Function<? super Publisher<I>, ? extends CompletionStage<R>> var5, SegmentListener var6, IntSet var7);

        abstract <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> var1, SegmentListener var2);
    }

    class SegmentedLocalPublisherStrategyLocal
    extends LocalEntryPublisherStrategy {
        SegmentedLocalPublisherStrategyLocal() {
        }

        @Override
        public <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, SegmentListener listener, IntSet concurrentSegments) {
            int extraThreadCount = LocalPublisherManagerImpl.this.cpuCount - 1;
            Flowable[] processors = new Flowable[extraThreadCount + 1];
            PrimitiveIterator.OfInt segmentIter = segments.iterator();
            for (int i = 0; i < extraThreadCount; ++i) {
                int initialSegment = LocalPublisherManagerImpl.this.getNextSegment(segmentIter);
                if (initialSegment == -1) {
                    processors[i] = Flowable.empty();
                    continue;
                }
                UnicastProcessor processor = UnicastProcessor.create();
                processors[i] = processor;
                LocalPublisherManagerImpl.this.nonBlockingScheduler.scheduleDirect(() -> this.lambda$exactlyOnceParallel$0(segmentIter, initialSegment, set, keysToExclude, toKeyFunction, collator, (FlowableProcessor)processor, concurrentSegments, listener));
            }
            int initialSegment = LocalPublisherManagerImpl.this.getNextSegment(segmentIter);
            if (initialSegment != -1) {
                UnicastProcessor processor = UnicastProcessor.create();
                processors[extraThreadCount] = processor;
                LocalPublisherManagerImpl.this.handleParallelSegment(segmentIter, initialSegment, set, keysToExclude, toKeyFunction, collator, processor, concurrentSegments, listener);
            } else {
                processors[extraThreadCount] = Flowable.empty();
            }
            return ParallelFlowable.fromArray((Publisher[])processors).sequential();
        }

        @Override
        public <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> collator, SegmentListener listener, IntSet concurrentSegments) {
            return Flowable.fromIterable((Iterable)segments).concatMapMaybe(segment -> {
                CompletionStage stage;
                Flowable innerFlowable = Flowable.fromPublisher(set.localPublisher((int)segment)).doOnComplete(() -> concurrentSegments.remove(segment));
                if (keysToExclude != null) {
                    innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
                }
                if (CompletionStages.isCompletedSuccessfully(stage = (CompletionStage)collator.apply((Object)innerFlowable))) {
                    if (listener.segmentsLost.contains(segment)) {
                        return Maybe.empty();
                    }
                    return Maybe.fromCompletionStage((CompletionStage)stage);
                }
                return Maybe.fromCompletionStage(stage.thenCompose(value -> {
                    if (listener.segmentsLost.contains(segment)) {
                        return CompletableFutures.completedNull();
                    }
                    return CompletableFuture.completedFuture(value);
                }));
            });
        }

        @Override
        public <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> finalValue, SegmentListener listener) {
            return LocalPublisherManagerImpl.this.handleLostSegments(finalValue, listener);
        }

        private /* synthetic */ void lambda$exactlyOnceParallel$0(PrimitiveIterator.OfInt segmentIter, int initialSegment, CacheSet set, Set keysToExclude, Function toKeyFunction, Function collator, FlowableProcessor processor, IntSet concurrentSegments, SegmentListener listener) {
            LocalPublisherManagerImpl.this.handleParallelSegment(segmentIter, initialSegment, set, keysToExclude, toKeyFunction, collator, processor, concurrentSegments, listener);
        }
    }

    private class SegmentAwarePublisherSupplierImpl<I, R>
    extends BaseSegmentAwarePublisherSupplier<R> {
        private final IntSet segments;
        private final CacheSet<I> cacheSet;
        private final Predicate<? super I> predicate;
        private final DeliveryGuarantee deliveryGuarantee;
        private final Function<? super Publisher<I>, ? extends Publisher<R>> transformer;

        private SegmentAwarePublisherSupplierImpl(IntSet segments, CacheSet<I> cacheSet, Function<? super I, K> toKeyFunction, Set<K> keysToExclude, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<I>, ? extends Publisher<R>> transformer) {
            this.segments = segments;
            this.cacheSet = cacheSet;
            this.predicate = keysToExclude != null ? v -> !keysToExclude.contains(toKeyFunction.apply((Object)v)) : null;
            this.deliveryGuarantee = deliveryGuarantee;
            this.transformer = transformer;
        }

        @Override
        public Publisher<R> publisherWithoutSegments() {
            Flowable publisher = this.cacheSet.localPublisher(this.segments);
            if (this.predicate != null) {
                publisher = Flowable.fromPublisher(publisher).filter(this.predicate);
            }
            return this.transformer.apply((Publisher<I>)publisher);
        }

        @Override
        Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean reuseNotifications) {
            switch (this.deliveryGuarantee) {
                case AT_MOST_ONCE: {
                    Notifications.NotificationBuilder atMostBuilder = reuseNotifications ? Notifications.reuseBuilder() : Notifications.newBuilder();
                    return Flowable.fromIterable((Iterable)this.segments).concatMap(segment -> {
                        Flowable flowable = Flowable.fromPublisher(this.cacheSet.localPublisher((int)segment));
                        if (this.predicate != null) {
                            flowable = flowable.filter(this.predicate);
                        }
                        return flowable.compose(this.transformer::apply).map(r -> atMostBuilder.value(r, (int)segment)).concatWith((SingleSource)Single.fromSupplier(() -> atMostBuilder.segmentComplete((int)segment)));
                    });
                }
                case AT_LEAST_ONCE: 
                case EXACTLY_ONCE: {
                    return Flowable.defer(() -> {
                        Notifications.NotificationBuilder builder = reuseNotifications ? Notifications.reuseBuilder() : Notifications.newBuilder();
                        IntSet concurrentSet = IntSets.concurrentCopyFrom((IntSet)this.segments, (int)LocalPublisherManagerImpl.this.maxSegment);
                        RemoveSegmentListener listener = new RemoveSegmentListener(concurrentSet);
                        LocalPublisherManagerImpl.this.changeListener.add(listener);
                        listener.verifyTopology(LocalPublisherManagerImpl.this.distributionManager.getCacheTopology());
                        return Flowable.fromIterable((Iterable)this.segments).concatMap(segment -> {
                            if (!concurrentSet.contains(segment)) {
                                return Flowable.just(builder.segmentLost((int)segment));
                            }
                            Flowable flowable = Flowable.fromPublisher(this.cacheSet.localPublisher((int)segment));
                            if (this.predicate != null) {
                                flowable = flowable.filter(this.predicate);
                            }
                            return flowable.compose(this.transformer::apply).map(r -> builder.value(r, (int)segment)).concatWith((SingleSource)Single.fromSupplier(() -> concurrentSet.remove(segment) ? builder.segmentComplete((int)segment) : builder.segmentLost((int)segment)));
                        }).doFinally(() -> LocalPublisherManagerImpl.this.changeListener.remove(listener));
                    });
                }
            }
            throw new UnsupportedOperationException("Unsupported delivery guarantee: " + String.valueOf((Object)this.deliveryGuarantee));
        }
    }

    class SegmentListener
    implements IntConsumer {
        protected final IntSet segments;
        protected final IntSet segmentsLost;

        SegmentListener(IntSet segments) {
            this.segments = segments;
            this.segmentsLost = IntSets.concurrentSet((int)LocalPublisherManagerImpl.this.maxSegment);
        }

        @Override
        public void accept(int segment) {
            if (this.segments.contains(segment)) {
                if (log.isTraceEnabled()) {
                    log.tracef("Listener %s lost segment %d", this, segment);
                }
                this.segmentsLost.set(segment);
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt segmentIterator = this.segments.iterator();
            while (segmentIterator.hasNext()) {
                int segment = segmentIterator.nextInt();
                if (localizedCacheTopology.isSegmentReadOwner(segment)) continue;
                this.segmentsLost.set(segment);
            }
        }
    }

    class RemoveSegmentListener
    implements IntConsumer {
        private final IntSet segments;

        RemoveSegmentListener(IntSet segments) {
            this.segments = segments;
        }

        @Override
        public void accept(int segment) {
            if (this.segments.remove(segment) && log.isTraceEnabled()) {
                log.tracef("Listener %s lost segment %d", this, segment);
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt segmentIterator = this.segments.iterator();
            while (segmentIterator.hasNext()) {
                int segment = segmentIterator.nextInt();
                if (localizedCacheTopology.isSegmentReadOwner(segment)) continue;
                if (log.isTraceEnabled()) {
                    log.tracef("Listener %s lost segment %d before invocation", this, segment);
                }
                segmentIterator.remove();
            }
        }
    }

    private static abstract class BaseSegmentAwarePublisherSupplier<R>
    implements SegmentAwarePublisherSupplier<R> {
        private BaseSegmentAwarePublisherSupplier() {
        }

        @Override
        public Publisher<SegmentPublisherSupplier.Notification<R>> publisherWithSegments() {
            return this.flowableWithNotifications(false).filter(notification -> !notification.isLostSegment()).map(n -> n);
        }

        @Override
        public Publisher<SegmentAwarePublisherSupplier.NotificationWithLost<R>> publisherWithLostSegments(boolean reuseNotifications) {
            return this.flowableWithNotifications(reuseNotifications);
        }

        abstract Flowable<SegmentAwarePublisherSupplier.NotificationWithLost<R>> flowableWithNotifications(boolean var1);
    }
}

