package org.infinispan.scattered.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.metadata.Metadata;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.scattered.ScatteredStateProvider;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.OutboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateProviderImpl;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/scattered/impl/ScatteredStateProviderImpl.class */
public class ScatteredStateProviderImpl extends StateProviderImpl implements ScatteredStateProvider {
    private static final Log log = LogFactory.getLog(ScatteredStateProviderImpl.class);

    @Inject
    protected ScatteredVersionManager svm;

    @Override // org.infinispan.statetransfer.StateProviderImpl, org.infinispan.statetransfer.StateProvider
    public void start() {
        super.start();
    }

    @Override // org.infinispan.statetransfer.StateProviderImpl, org.infinispan.statetransfer.StateProvider
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        return z ? replicateAndInvalidate(cacheTopology) : CompletableFutures.completedNull();
    }

    private CompletableFuture<Void> replicateAndInvalidate(CacheTopology cacheTopology) {
        Address nextMember = getNextMember(cacheTopology);
        if (nextMember == null) {
            return CompletableFutures.completedNull();
        }
        HashSet hashSet = new HashSet(cacheTopology.getActualMembers());
        Address address = this.rpcManager.getAddress();
        hashSet.remove(address);
        hashSet.remove(nextMember);
        if (!cacheTopology.getCurrentCH().getMembers().contains(address)) {
            log.trace("Local address is not a member of currentCH, returning");
            return CompletableFutures.completedNull();
        }
        IntSet from = IntSets.from(cacheTopology.getCurrentCH().getSegmentsForOwner(address));
        from.retainAll(cacheTopology.getPendingCH().getSegmentsForOwner(address));
        log.trace("Segments to replicate and invalidate: " + String.valueOf(from));
        if (from.isEmpty()) {
            return CompletableFutures.completedNull();
        }
        AtomicInteger atomicInteger = new AtomicInteger(1);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(nextMember, from, cacheTopology.getCurrentCH().getNumSegments(), this.chunkSize, cacheTopology.getTopologyId(), collection -> {
            invalidateChunks(collection, hashSet, atomicInteger, completableFuture, cacheTopology);
        }, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true, true);
        outboundTransferTask.execute(readEntries(from)).whenComplete((r8, th) -> {
            if (th != null) {
                logError(outboundTransferTask, th);
            }
            if (atomicInteger.decrementAndGet() == 0) {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    private void invalidateChunks(Collection<StateChunk> collection, Set<Address> set, AtomicInteger atomicInteger, CompletableFuture<Void> completableFuture, CacheTopology cacheTopology) {
        int sum = collection.stream().mapToInt(stateChunk -> {
            return stateChunk.getCacheEntries().size();
        }).sum();
        if (sum == 0) {
            log.tracef("Nothing to invalidate", new Object[0]);
            return;
        }
        Object[] objArr = new Object[sum];
        int[] iArr = new int[sum];
        long[] jArr = new long[sum];
        int i = 0;
        Iterator<StateChunk> it = collection.iterator();
        while (it.hasNext()) {
            for (InternalCacheEntry<?, ?> internalCacheEntry : it.next().getCacheEntries()) {
                if (internalCacheEntry.getMetadata() != null && internalCacheEntry.getMetadata().version() != null) {
                    objArr[i] = internalCacheEntry.getKey();
                    SimpleClusteredVersion simpleClusteredVersion = (SimpleClusteredVersion) internalCacheEntry.getMetadata().version();
                    iArr[i] = simpleClusteredVersion.getTopologyId();
                    jArr[i] = simpleClusteredVersion.getVersion();
                    i++;
                }
            }
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invalidating %d entries from segments %s", sum, collection.stream().map((v0) -> {
                return v0.getSegmentId();
            }).collect(Collectors.toList()));
        }
        atomicInteger.incrementAndGet();
        this.rpcManager.invokeCommand(set, this.commandsFactory.buildInvalidateVersionsCommand(cacheTopology.getTopologyId(), objArr, iArr, jArr, true), MapResponseCollector.ignoreLeavers(set.size()), this.rpcManager.getSyncRpcOptions()).whenComplete((map, th) -> {
            if (th != null) {
                try {
                    log.failedInvalidatingRemoteCache(th);
                } finally {
                    if (atomicInteger.decrementAndGet() == 0) {
                        completableFuture.complete(false);
                    }
                }
            }
        });
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        Address address = this.rpcManager.getAddress();
        List<Address> actualMembers = cacheTopology.getActualMembers();
        if (actualMembers.size() == 1) {
            return null;
        }
        Iterator<Address> it = actualMembers.iterator();
        while (it.hasNext()) {
            if (it.next().equals(address)) {
                return it.hasNext() ? it.next() : actualMembers.get(0);
            }
        }
        throw new IllegalStateException();
    }

    @Override // org.infinispan.scattered.ScatteredStateProvider
    public void startKeysTransfer(IntSet intSet, Address address) {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(address, intSet, cacheTopology.getCurrentCH().getNumSegments(), this.chunkSize, cacheTopology.getTopologyId(), collection -> {
        }, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true, false);
        addTransfer(outboundTransferTask);
        outboundTransferTask.execute(readKeys(intSet)).whenComplete((r6, th) -> {
            if (th != null) {
                logError(outboundTransferTask, th);
            }
            onTaskCompletion(outboundTransferTask);
        });
    }

    private Flowable<SegmentPublisherSupplier.Notification<InternalCacheEntry<?, ?>>> readKeys(IntSet intSet) {
        Address address = this.rpcManager.getAddress();
        return super.readEntries(intSet).filter(notification -> {
            if (!notification.isValue()) {
                return true;
            }
            InternalCacheEntry internalCacheEntry = (InternalCacheEntry) notification.value();
            return (internalCacheEntry.getMetadata() == null || internalCacheEntry.getMetadata().version() == null) ? false : true;
        }).map(notification2 -> {
            if (!notification2.isValue()) {
                return notification2;
            }
            InternalCacheEntry internalCacheEntry = (InternalCacheEntry) notification2.value();
            return Notifications.value(this.entryFactory.create((InternalEntryFactory) internalCacheEntry.getKey(), (K) null, (Metadata) new RemoteMetadata(address, internalCacheEntry.getMetadata().version())), notification2.valueSegment());
        });
    }

    @Override // org.infinispan.scattered.ScatteredStateProvider
    public CompletionStage<Void> confirmRevokedSegments(int i) {
        return this.stateTransferLock.topologyFuture(i);
    }
}
