/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.partionhandling.AvailabilityMode;
import org.infinispan.partionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.topology.CacheTopologyHandler;
import org.infinispan.topology.LocalCacheStatus;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="LocalTopologyManager", description="Controls the cache membership and state transfer")
public class LocalTopologyManagerImpl
implements LocalTopologyManager {
    private static Log log = LogFactory.getLog(LocalTopologyManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private Transport transport;
    private ExecutorService asyncTransportExecutor;
    private GlobalComponentRegistry gcr;
    private TimeService timeService;
    private final ConcurrentMap<String, LocalCacheStatus> runningCaches = CollectionFactory.makeConcurrentMap();
    private volatile boolean running;

    @Inject
    public void inject(Transport transport, @ComponentName(value="org.infinispan.executors.transport") ExecutorService asyncTransportExecutor, GlobalComponentRegistry gcr, TimeService timeService) {
        this.transport = transport;
        this.asyncTransportExecutor = asyncTransportExecutor;
        this.gcr = gcr;
        this.timeService = timeService;
    }

    @Start(priority=100)
    public void start() {
        if (trace) {
            log.tracef("Starting LocalTopologyManager on %s", (Object)this.transport.getAddress());
        }
        this.running = true;
    }

    @Stop(priority=9)
    public void stop() {
        if (trace) {
            log.tracef("Stopping LocalTopologyManager on %s", (Object)this.transport.getAddress());
        }
        this.running = false;
    }

    @Override
    public CacheTopology join(String cacheName, CacheJoinInfo joinInfo, CacheTopologyHandler stm, PartitionHandlingManager phm) throws Exception {
        log.debugf("Node %s joining cache %s", (Object)this.transport.getAddress(), (Object)cacheName);
        LocalCacheStatus cacheStatus = new LocalCacheStatus(joinInfo, stm, phm);
        this.runningCaches.put(cacheName, cacheStatus);
        int viewId = this.transport.getViewId();
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.JOIN, this.transport.getAddress(), joinInfo, viewId);
        long timeout = joinInfo.getTimeout();
        long endTime = this.timeService.expectedEndTime(timeout, TimeUnit.MILLISECONDS);
        LocalCacheStatus localCacheStatus = cacheStatus;
        synchronized (localCacheStatus) {
            while (true) {
                try {
                    CacheStatusResponse initialStatus;
                    while ((initialStatus = (CacheStatusResponse)this.executeOnCoordinator(command, timeout)) == null) {
                    }
                    this.handleTopologyUpdate(cacheName, initialStatus.getCacheTopology(), initialStatus.getAvailabilityMode(), viewId);
                    this.handleStableTopologyUpdate(cacheName, initialStatus.getStableTopology(), viewId);
                    return initialStatus.getCacheTopology();
                }
                catch (Exception e) {
                    log.debugf((Throwable)e, "Error sending join request for cache %s to coordinator", (Object)cacheName);
                    if (this.timeService.isTimeExpired(endTime)) {
                        throw e;
                    }
                    Thread.sleep(1000L);
                    continue;
                }
                break;
            }
        }
    }

    @Override
    public void leave(String cacheName) {
        log.debugf("Node %s leaving cache %s", (Object)this.transport.getAddress(), (Object)cacheName);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.remove(cacheName);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.LEAVE, this.transport.getAddress(), this.transport.getViewId());
        try {
            this.executeOnCoordinator(command, cacheStatus.getJoinInfo().getTimeout());
        }
        catch (Exception e) {
            log.debugf((Throwable)e, "Error sending the leave request for cache %s to coordinator", (Object)cacheName);
        }
    }

    @Override
    public void confirmRebalance(String cacheName, int topologyId, int rebalanceId, Throwable throwable) {
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.REBALANCE_CONFIRM, this.transport.getAddress(), topologyId, rebalanceId, throwable, this.transport.getViewId());
        try {
            this.executeOnCoordinatorAsync(command);
        }
        catch (Exception e) {
            log.debugf((Throwable)e, "Error sending the rebalance completed notification for cache %s to the coordinator", (Object)cacheName);
        }
    }

    @Override
    public Map<String, CacheStatusResponse> handleStatusRequest(int viewId) {
        HashMap<String, CacheStatusResponse> response = new HashMap<String, CacheStatusResponse>();
        for (Map.Entry e : this.runningCaches.entrySet()) {
            String cacheName = (String)e.getKey();
            LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
            AvailabilityMode availabilityMode = cacheStatus.getPartitionHandlingManager() != null ? cacheStatus.getPartitionHandlingManager().getAvailabilityMode() : null;
            response.put((String)e.getKey(), new CacheStatusResponse(cacheStatus.getJoinInfo(), cacheStatus.getCurrentTopology(), cacheStatus.getStableTopology(), availabilityMode));
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleTopologyUpdate(String cacheName, CacheTopology cacheTopology, AvailabilityMode availabilityMode, int viewId) throws InterruptedException {
        if (!this.running) {
            log.tracef("Ignoring consistent hash update %s for cache %s, the local cache manager is not running", (Object)cacheTopology.getTopologyId(), (Object)cacheName);
            return;
        }
        this.waitForView(viewId);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring consistent hash update %s for cache %s that doesn't exist locally", (Object)cacheTopology.getTopologyId(), (Object)cacheName);
            return;
        }
        LocalCacheStatus localCacheStatus = cacheStatus;
        synchronized (localCacheStatus) {
            CacheTopology existingTopology = cacheStatus.getCurrentTopology();
            if (existingTopology != null && cacheTopology.getTopologyId() <= existingTopology.getTopologyId()) {
                log.debugf("Ignoring late consistent hash update for cache %s, current topology is %s: %s", (Object)cacheName, (Object)existingTopology.getTopologyId(), (Object)cacheTopology);
                return;
            }
            CacheTopologyHandler handler = cacheStatus.getHandler();
            this.resetLocalTopologyBeforeRebalance(cacheName, cacheTopology, existingTopology, handler);
            log.debugf("Updating local consistent hash(es) for cache %s: new topology = %s", (Object)cacheName, (Object)cacheTopology);
            cacheStatus.setCurrentTopology(cacheTopology);
            ConsistentHash unionCH = null;
            if (cacheTopology.getPendingCH() != null) {
                unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
            }
            CacheTopology unionTopology = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId(), cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), unionCH);
            unionTopology.logRoutingTableInformation();
            if ((existingTopology == null || existingTopology.getRebalanceId() != cacheTopology.getRebalanceId()) && unionCH != null) {
                log.tracef("This topology update has a pending CH, starting the rebalance now", new Object[0]);
                handler.rebalance(unionTopology);
            } else {
                handler.updateConsistentHash(unionTopology);
            }
            if (cacheStatus.getPartitionHandlingManager() != null && availabilityMode != null) {
                cacheStatus.getPartitionHandlingManager().setAvailabilityMode(availabilityMode);
            }
        }
    }

    private void resetLocalTopologyBeforeRebalance(String cacheName, CacheTopology newCacheTopology, CacheTopology oldCacheTopology, CacheTopologyHandler handler) throws InterruptedException {
        boolean newRebalance;
        boolean bl = newRebalance = newCacheTopology.getPendingCH() != null;
        if (newRebalance) {
            if (oldCacheTopology == null) {
                return;
            }
            if (newCacheTopology.getTopologyId() == oldCacheTopology.getTopologyId() + 1) {
                return;
            }
            if (newCacheTopology.getRebalanceId() != oldCacheTopology.getRebalanceId()) {
                CacheTopology resetTopology = new CacheTopology(newCacheTopology.getTopologyId() - 1, newCacheTopology.getRebalanceId() - 1, newCacheTopology.getCurrentCH(), null, null);
                log.debugf("Installing fake cache topology %s for cache %s", (Object)resetTopology, (Object)cacheName);
                handler.updateConsistentHash(resetTopology);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleStableTopologyUpdate(String cacheName, CacheTopology newStableTopology, int viewId) {
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus != null) {
            LocalCacheStatus localCacheStatus = cacheStatus;
            synchronized (localCacheStatus) {
                CacheTopology stableTopology = cacheStatus.getStableTopology();
                if (stableTopology == null || stableTopology.getTopologyId() < newStableTopology.getTopologyId()) {
                    log.tracef("Updating stable topology for cache %s: %s", (Object)cacheName, (Object)newStableTopology);
                    cacheStatus.setStableTopology(newStableTopology);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException {
        if (!this.running) {
            log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is not running", (Object)cacheTopology.getTopologyId(), (Object)cacheName);
            return;
        }
        this.waitForView(viewId);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring rebalance %s for cache %s that doesn't exist locally", (Object)cacheTopology.getTopologyId(), (Object)cacheName);
            return;
        }
        LocalCacheStatus localCacheStatus = cacheStatus;
        synchronized (localCacheStatus) {
            CacheTopology existingTopology = cacheStatus.getCurrentTopology();
            if (existingTopology != null && cacheTopology.getTopologyId() <= existingTopology.getTopologyId()) {
                log.debugf("Ignoring old rebalance for cache %s, current topology is %s: %s", (Object)cacheName, (Object)existingTopology.getTopologyId(), (Object)cacheTopology);
                return;
            }
            CacheTopologyHandler handler = cacheStatus.getHandler();
            this.resetLocalTopologyBeforeRebalance(cacheName, cacheTopology, existingTopology, handler);
            log.debugf("Starting local rebalance for cache %s, topology = %s", (Object)cacheName, (Object)cacheTopology);
            cacheTopology.logRoutingTableInformation();
            cacheStatus.setCurrentTopology(cacheTopology);
            ConsistentHash unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
            CacheTopology newTopology = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId(), cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), unionCH);
            handler.rebalance(newTopology);
        }
    }

    @Override
    public CacheTopology getCacheTopology(String cacheName) {
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        return cacheStatus.getCurrentTopology();
    }

    @Override
    public CacheTopology getStableCacheTopology(String cacheName) {
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        return cacheStatus.getCurrentTopology();
    }

    @Override
    public boolean isTotalOrderCache(String cacheName) {
        if (!this.running) {
            log.tracef("isTotalOrderCache(%s) returning false because the local cache manager is not running", (Object)cacheName);
            return false;
        }
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("isTotalOrderCache(%s) returning false because the cache doesn't exist locally", (Object)cacheName);
            return false;
        }
        boolean totalOrder = cacheStatus.getJoinInfo().isTotalOrder();
        log.tracef("isTotalOrderCache(%s) returning %s", (Object)cacheName, (Object)totalOrder);
        return totalOrder;
    }

    private void waitForView(int viewId) throws InterruptedException {
        if (this.transport.getViewId() < viewId) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", (Object)viewId, (Object)this.transport.getViewId());
        }
        while (this.transport.getViewId() < viewId) {
            Thread.sleep(100L);
        }
    }

    @ManagedAttribute(description="Rebalancing enabled", displayName="Rebalancing enabled", dataType=DataType.TRAIT, writable=true)
    public boolean isRebalancingEnabled() throws Exception {
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.POLICY_GET_STATUS, this.transport.getAddress(), this.transport.getViewId());
        return (Boolean)this.executeOnCoordinator(command, this.getGlobalTimeout());
    }

    @Override
    public void setRebalancingEnabled(boolean enabled) throws Exception {
        CacheTopologyControlCommand.Type type = enabled ? CacheTopologyControlCommand.Type.POLICY_ENABLE : CacheTopologyControlCommand.Type.POLICY_DISABLE;
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(null, type, this.transport.getAddress(), this.transport.getViewId());
        this.executeOnClusterSync(command, this.getGlobalTimeout(), false, false);
    }

    @ManagedAttribute(description="Cluster availability", displayName="Cluster availability", dataType=DataType.TRAIT, writable=false)
    public String getClusterAvailability() {
        AvailabilityMode clusterAvailability = AvailabilityMode.AVAILABLE;
        for (LocalCacheStatus cacheStatus : this.runningCaches.values()) {
            AvailabilityMode availabilityMode = cacheStatus.getPartitionHandlingManager() != null ? cacheStatus.getPartitionHandlingManager().getAvailabilityMode() : null;
            clusterAvailability = availabilityMode != null ? clusterAvailability.min(availabilityMode) : clusterAvailability;
        }
        return clusterAvailability.toString();
    }

    @Override
    public AvailabilityMode getCacheAvailability(String cacheName) {
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        AvailabilityMode availabilityMode = cacheStatus.getPartitionHandlingManager() != null ? cacheStatus.getPartitionHandlingManager().getAvailabilityMode() : AvailabilityMode.AVAILABLE;
        return availabilityMode;
    }

    @Override
    public void setCacheAvailability(String cacheName, AvailabilityMode availabilityMode) throws Exception {
        CacheTopologyControlCommand.Type type = CacheTopologyControlCommand.Type.AVAILABILITY_MODE_CHANGE;
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, type, this.transport.getAddress(), availabilityMode, this.transport.getViewId());
        this.executeOnCoordinator(command, this.getGlobalTimeout());
    }

    private Object executeOnCoordinator(ReplicableCommand command, long timeout) throws Exception {
        Response response;
        if (this.transport.isCoordinator()) {
            try {
                if (log.isTraceEnabled()) {
                    log.tracef("Attempting to execute command on self: %s", (Object)command);
                }
                this.gcr.wireDependencies(command);
                response = (Response)command.perform(null);
            }
            catch (Throwable t) {
                throw new CacheException("Error handling join request", t);
            }
        } else {
            Address coordinator = this.transport.getCoordinator();
            Map<Address, Response> responseMap = this.transport.invokeRemotely(Collections.singleton(coordinator), command, ResponseMode.SYNCHRONOUS, timeout, true, null, false, false);
            response = responseMap.get(coordinator);
        }
        if (response == null || !response.isSuccessful()) {
            Exception exception = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
            throw new CacheException("Bad response received from coordinator: " + response, exception);
        }
        return ((SuccessfulResponse)response).getResponseValue();
    }

    private void executeOnCoordinatorAsync(final ReplicableCommand command) throws Exception {
        if (this.transport.isCoordinator()) {
            this.asyncTransportExecutor.submit(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    if (log.isTraceEnabled()) {
                        log.tracef("Attempting to execute command on self: %s", (Object)command);
                    }
                    LocalTopologyManagerImpl.this.gcr.wireDependencies(command);
                    try {
                        return command.perform(null);
                    }
                    catch (Throwable t) {
                        log.errorf(t, "Failed to execute ReplicableCommand %s on coordinator async: %s", (Object)command, (Object)t.getMessage());
                        throw new Exception(t);
                    }
                }
            });
        } else {
            Address coordinator = this.transport.getCoordinator();
            this.transport.invokeRemotely(Collections.singleton(coordinator), command, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, 0L, true, null, false, false);
        }
    }

    private Map<Address, Object> executeOnClusterSync(final ReplicableCommand command, final int timeout, boolean totalOrder, boolean distributed) throws Exception {
        Response localResponse;
        if (totalOrder) {
            Map<Address, Response> responseMap = this.transport.invokeRemotely(null, command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null, totalOrder, distributed);
            HashMap<Address, Object> responseValues = new HashMap<Address, Object>(this.transport.getMembers().size());
            for (Map.Entry<Address, Response> entry : responseMap.entrySet()) {
                Address address = entry.getKey();
                Response response = entry.getValue();
                if (!response.isSuccessful()) {
                    Exception cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                    throw new CacheException("Unsuccessful response received from node " + address + ": " + response, cause);
                }
                responseValues.put(address, ((SuccessfulResponse)response).getResponseValue());
            }
            return responseValues;
        }
        Future<Map<Address, Response>> remoteFuture = this.asyncTransportExecutor.submit(new Callable<Map<Address, Response>>(){

            @Override
            public Map<Address, Response> call() throws Exception {
                return LocalTopologyManagerImpl.this.transport.invokeRemotely(null, command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, true, null, false, false);
            }
        });
        this.gcr.wireDependencies(command);
        try {
            if (log.isTraceEnabled()) {
                log.tracef("Attempting to execute command on self: %s", (Object)command);
            }
            localResponse = (Response)command.perform(null);
        }
        catch (Throwable throwable) {
            throw new Exception(throwable);
        }
        if (!localResponse.isSuccessful()) {
            throw new CacheException("Unsuccessful local response");
        }
        Map<Address, Response> responseMap = remoteFuture.get(timeout, TimeUnit.MILLISECONDS);
        HashMap<Address, Object> responseValues = new HashMap<Address, Object>(this.transport.getMembers().size());
        for (Map.Entry<Address, Response> entry : responseMap.entrySet()) {
            Address address = entry.getKey();
            Response response = entry.getValue();
            if (!response.isSuccessful()) {
                Exception cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                throw new CacheException("Unsuccessful response received from node " + address + ": " + response, cause);
            }
            responseValues.put(address, ((SuccessfulResponse)response).getResponseValue());
        }
        responseValues.put(this.transport.getAddress(), ((SuccessfulResponse)localResponse).getResponseValue());
        return responseValues;
    }

    private int getGlobalTimeout() {
        return (int)this.gcr.getGlobalConfiguration().transport().distributedSyncTimeout();
    }
}

