package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.concurrent.AggregatingNotifyingFutureBuilder;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/statetransfer/OutboundTransferTask.class */
public class OutboundTransferTask implements Runnable {
    private static final Log log = LogFactory.getLog(OutboundTransferTask.class);
    private StateProviderImpl stateProvider;
    private final int topologyId;
    private final Address destination;
    private final int stateTransferChunkSize;
    private final Configuration configuration;
    private final ConsistentHash readCh;
    private final DataContainer dataContainer;
    private final CacheLoaderManager cacheLoaderManager;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private int accumulatedEntries;
    private FutureTask runnableFuture;
    private final boolean trace = log.isTraceEnabled();
    private final Set<Integer> segments = new CopyOnWriteArraySet();
    private final Map<Integer, List<InternalCacheEntry>> entriesBySegment = ConcurrentMapFactory.makeConcurrentMap();
    private final NotifyingNotifiableFuture<Object> sendFuture = new AggregatingNotifyingFutureBuilder(null);

    public OutboundTransferTask(Address address, Set<Integer> set, int i, int i2, ConsistentHash consistentHash, StateProviderImpl stateProviderImpl, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager, RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory, long j) {
        if (set == null || set.isEmpty()) {
            throw new IllegalArgumentException("Segments must not be null or empty");
        }
        if (address == null) {
            throw new IllegalArgumentException("Destination address cannot be null");
        }
        if (i <= 0) {
            throw new IllegalArgumentException("stateTransferChunkSize must be greater than 0");
        }
        this.stateProvider = stateProviderImpl;
        this.destination = address;
        this.segments.addAll(set);
        this.stateTransferChunkSize = i;
        this.topologyId = i2;
        this.readCh = consistentHash;
        this.dataContainer = dataContainer;
        this.cacheLoaderManager = cacheLoaderManager;
        this.rpcManager = rpcManager;
        this.configuration = configuration;
        this.commandsFactory = commandsFactory;
        this.timeout = j;
    }

    public void execute(ExecutorService executorService) {
        if (this.runnableFuture != null) {
            throw new IllegalStateException("This task was already submitted");
        }
        this.runnableFuture = new FutureTask<Void>(this, null) { // from class: org.infinispan.statetransfer.OutboundTransferTask.1
            @Override // java.util.concurrent.FutureTask
            protected void done() {
                OutboundTransferTask.this.stateProvider.onTaskCompletion(OutboundTransferTask.this);
            }
        };
        executorService.submit(this.runnableFuture);
    }

    public Address getDestination() {
        return this.destination;
    }

    public Set<Integer> getSegments() {
        return this.segments;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            for (InternalCacheEntry internalCacheEntry : this.dataContainer) {
                int segment = this.readCh.getSegment(internalCacheEntry.getKey());
                if (this.segments.contains(Integer.valueOf(segment))) {
                    sendEntry(internalCacheEntry, segment);
                }
            }
            CacheStore cacheStore = getCacheStore();
            if (cacheStore != null) {
                try {
                    for (Object obj : cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(this.dataContainer))) {
                        int segment2 = this.readCh.getSegment(obj);
                        if (this.segments.contains(Integer.valueOf(segment2))) {
                            try {
                                InternalCacheEntry load = cacheStore.load(obj);
                                if (load != null) {
                                    sendEntry(load, segment2);
                                }
                            } catch (CacheLoaderException e) {
                                log.failedLoadingValueFromCacheStore(obj, e);
                            }
                        }
                    }
                } catch (CacheLoaderException e2) {
                    log.failedLoadingKeysFromCacheStore(e2);
                }
            } else if (this.trace) {
                log.tracef("No cache store or the cache store is shared, no need to send any stored cache entries for segments: %s", this.segments);
            }
            sendEntries(true);
        } catch (Throwable th) {
            if (!this.runnableFuture.isCancelled()) {
                log.error("Failed to execute outbound transfer", th);
            }
        }
        if (this.trace) {
            log.tracef("Outbound transfer of segments %s to %s is complete", this.segments, this.destination);
        }
    }

    private CacheStore getCacheStore() {
        if (this.cacheLoaderManager == null || !this.cacheLoaderManager.isEnabled() || this.cacheLoaderManager.isShared() || !this.cacheLoaderManager.isFetchPersistentState()) {
            return null;
        }
        return this.cacheLoaderManager.getCacheStore();
    }

    private void sendEntry(InternalCacheEntry internalCacheEntry, int i) {
        if (this.accumulatedEntries >= this.stateTransferChunkSize) {
            sendEntries(false);
            this.entriesBySegment.clear();
            this.accumulatedEntries = 0;
        }
        List<InternalCacheEntry> list = this.entriesBySegment.get(Integer.valueOf(i));
        if (list == null) {
            list = new ArrayList();
            this.entriesBySegment.put(Integer.valueOf(i), list);
        }
        list.add(internalCacheEntry);
        this.accumulatedEntries++;
    }

    private void sendEntries(boolean z) {
        ArrayList arrayList = new ArrayList();
        if (z) {
            Iterator<Integer> it = this.segments.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                List<InternalCacheEntry> list = this.entriesBySegment.get(Integer.valueOf(intValue));
                if (list == null) {
                    list = InfinispanCollections.emptyList();
                }
                arrayList.add(new StateChunk(intValue, list, z));
            }
        } else {
            for (Map.Entry<Integer, List<InternalCacheEntry>> entry : this.entriesBySegment.entrySet()) {
                List<InternalCacheEntry> value = entry.getValue();
                if (!value.isEmpty()) {
                    arrayList.add(new StateChunk(entry.getKey().intValue(), value, z));
                }
            }
        }
        if (!arrayList.isEmpty() || z) {
            if (this.trace) {
                log.tracef("Sending %d cache entries from segments %s to node %s", Integer.valueOf(this.accumulatedEntries), this.entriesBySegment.keySet(), this.destination);
            }
            this.rpcManager.invokeRemotelyInFuture(Collections.singleton(this.destination), this.commandsFactory.buildStateResponseCommand(this.rpcManager.getAddress(), this.topologyId, arrayList), false, this.sendFuture, this.timeout);
        }
    }

    public void cancelSegments(Set<Integer> set) {
        if (this.trace) {
            log.tracef("Cancelling outbound transfer of segments %s to %s", set, this.destination);
        }
        if (this.segments.removeAll(set)) {
            this.entriesBySegment.keySet().removeAll(set);
            if (this.segments.isEmpty()) {
                cancel();
            }
        }
    }

    public void cancel() {
        if (this.runnableFuture == null || this.runnableFuture.isCancelled()) {
            return;
        }
        this.runnableFuture.cancel(true);
        this.sendFuture.cancel(true);
    }

    public boolean isCancelled() {
        return this.runnableFuture != null && this.runnableFuture.isCancelled();
    }
}
