/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.RpcException;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.ResponseCollectors;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.BaseBackupReceiver;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;

public class ClusteredCacheBackupReceiver
extends BaseBackupReceiver {
    private static final Log log = LogFactory.getLog(ClusteredCacheBackupReceiver.class);
    private static final boolean trace = log.isDebugEnabled();

    ClusteredCacheBackupReceiver(Cache<Object, Object> cache) {
        super(cache);
    }

    @Override
    public CompletionStage<Void> handleStateTransferControl(XSiteStateTransferControlCommand command) {
        XSiteStateTransferControlCommand invokeCommand = command;
        if (!command.getCacheName().equals(this.cacheName)) {
            invokeCommand = command.copyForCache(this.cacheName);
        }
        invokeCommand.setSiteName(command.getOriginSite());
        return this.invokeRemotelyInLocalSite(invokeCommand);
    }

    @Override
    public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand cmd) {
        CompletableFuture<Void> allowInvocation = this.checkInvocationAllowedFuture();
        if (allowInvocation != null) {
            return allowInvocation;
        }
        long endTime = this.timeService.expectedEndTime(cmd.getTimeout(), TimeUnit.MILLISECONDS);
        ClusteringDependentLogic clusteringDependentLogic = this.cache.getComponentRegistry().getComponent(ClusteringDependentLogic.class);
        HashMap<Address, List> primaryOwnersChunks = new HashMap<Address, List>();
        Address localAddress = clusteringDependentLogic.getAddress();
        if (trace) {
            log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", cmd);
        }
        for (XSiteState state : cmd.getChunk()) {
            Address primaryOwner = clusteringDependentLogic.getCacheTopology().getDistribution(state.key()).primary();
            List primaryOwnerList = primaryOwnersChunks.computeIfAbsent(primaryOwner, k -> new LinkedList());
            primaryOwnerList.add(state);
        }
        List localChunks = (List)primaryOwnersChunks.remove(localAddress);
        AggregateCompletionStage<Void> cf = CompletionStages.aggregateCompletionStage();
        for (Map.Entry entry : primaryOwnersChunks.entrySet()) {
            if (entry.getValue() == null || ((List)entry.getValue()).isEmpty()) continue;
            if (trace) {
                log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue());
            }
            StatePushTask task = new StatePushTask((List)entry.getValue(), (Address)entry.getKey(), this.cache, endTime);
            task.executeRemote();
            cf.dependsOn(task);
        }
        primaryOwnersChunks.clear();
        if (trace) {
            log.tracef("Local node '%s' will apply %s", localAddress, localChunks);
        }
        if (localChunks != null) {
            StatePushTask task = new StatePushTask(localChunks, localAddress, this.cache, endTime);
            task.executeLocal();
            cf.dependsOn(task);
        }
        return cf.freeze().thenApply(this::assertAllowInvocationFunction);
    }

    private CompletionStage<Void> invokeRemotelyInLocalSite(CacheRpcCommand command) {
        RpcManager rpcManager = this.cache.getRpcManager();
        CompletionStage<Map<Address, Response>> remote = rpcManager.invokeCommandOnAll(command, MapResponseCollector.validOnly(), rpcManager.getSyncRpcOptions());
        CompletableFuture<Response> local = LocalInvocation.newInstanceFromCache(this.cache, command).callAsync();
        return CompletableFuture.allOf(remote.toCompletableFuture(), local);
    }

    private class StatePushTask
    extends CompletableFuture<Void>
    implements ResponseCollector<Response>,
    BiFunction<Response, Throwable, Void> {
        private final List<XSiteState> chunk;
        private final Address address;
        private final AdvancedCache<?, ?> cache;
        private final long endTime;

        private StatePushTask(List<XSiteState> chunk, Address address, AdvancedCache<?, ?> cache, long endTime) {
            this.chunk = chunk;
            this.address = address;
            this.cache = cache;
            this.endTime = endTime;
        }

        @Override
        public Void apply(Response response, Throwable throwable) {
            if (throwable != null) {
                if (this.isShouldGiveUp()) {
                    return null;
                }
                RpcManager rpcManager = this.cache.getRpcManager();
                if (rpcManager.getMembers().contains(this.address) && !rpcManager.getAddress().equals(this.address)) {
                    if (trace) {
                        log.tracef(throwable, "An exception was sent by %s. Retrying!", this.address);
                    }
                    this.executeRemote();
                } else {
                    if (trace) {
                        log.tracef(throwable, "An exception was sent by %s. Retrying locally!", this.address);
                    }
                    this.executeLocal();
                }
            } else if (response == CacheNotFoundResponse.INSTANCE) {
                if (trace) {
                    log.tracef("Cache not found in node '%s'. Retrying locally!", this.address);
                }
                if (this.isShouldGiveUp()) {
                    return null;
                }
                this.executeLocal();
            } else {
                this.complete(null);
            }
            return null;
        }

        @Override
        public Response addResponse(Address sender, Response response) {
            if (response instanceof ValidResponse || response instanceof CacheNotFoundResponse) {
                return response;
            }
            if (response instanceof ExceptionResponse) {
                throw ResponseCollectors.wrapRemoteException(sender, ((ExceptionResponse)response).getException());
            }
            throw ResponseCollectors.wrapRemoteException(sender, (Throwable)((Object)new RpcException("Unknown response type: " + response)));
        }

        @Override
        public Response finish() {
            return null;
        }

        void executeRemote() {
            RpcManager rpcManager = this.cache.getRpcManager();
            RpcOptions rpcOptions = rpcManager.getSyncRpcOptions();
            rpcManager.invokeCommand(this.address, (ReplicableCommand)BaseBackupReceiver.newStatePushCommand(this.cache, this.chunk), this, rpcOptions).handle(this);
        }

        void executeLocal() {
            LocalInvocation.newInstanceFromCache(this.cache, BaseBackupReceiver.newStatePushCommand(this.cache, this.chunk)).callAsync().handle((BiFunction)this);
        }

        private boolean isShouldGiveUp() {
            ComponentStatus status = this.cache.getStatus();
            if (!status.allowInvocations()) {
                this.completeExceptionally(new IllegalLifecycleStateException("Cache is stopping or terminated: " + (Object)((Object)status)));
                return true;
            }
            if (ClusteredCacheBackupReceiver.this.timeService.isTimeExpired(this.endTime)) {
                this.completeExceptionally((Throwable)((Object)new TimeoutException("Unable to apply state in the time limit.")));
                return true;
            }
            return false;
        }
    }
}

