/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.TriangleOrderManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.DefaultConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumerImpl;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.TestKey;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManager;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.StateConsumerTest")
public class StateConsumerTest
extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(StateConsumerTest.class);
    private ExecutorService pooledExecutorService;

    @AfterMethod
    public void tearDown() {
        if (this.pooledExecutorService != null) {
            this.pooledExecutorService.shutdownNow();
        }
    }

    public void test1() throws Exception {
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.clustering().invocationBatching().enable().clustering().cacheMode(CacheMode.DIST_SYNC).clustering().stateTransfer().timeout(30000L).locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis()).locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        Configuration configuration = cb.build();
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        Address[] addresses = new Address[4];
        for (int i = 0; i < 4; ++i) {
            addresses[i] = new TestAddress(i);
            persistentUUIDManager.addPersistentAddressMapping(addresses[i], PersistentUUID.randomUUID());
        }
        List<Address> members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]);
        List<Address> members2 = Arrays.asList(addresses[0], addresses[1], addresses[2]);
        DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory();
        HashFunctionPartitioner keyPartitioner = new HashFunctionPartitioner(40);
        DefaultConsistentHash ch1 = chf.create(2, 40, members1, null);
        DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2, null);
        DefaultConsistentHash ch3 = chf.rebalance(ch2);
        DefaultConsistentHash ch23 = chf.union(ch2, ch3);
        log.debug((Object)ch1);
        log.debug((Object)ch2);
        Cache cache = (Cache)Mockito.mock(Cache.class);
        Mockito.when((Object)cache.getName()).thenReturn((Object)"testCache");
        Mockito.when((Object)cache.getStatus()).thenReturn((Object)ComponentStatus.RUNNING);
        this.pooledExecutorService = new ThreadPoolExecutor(0, 20, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), this.getTestThreadFactory("Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
        LocalTopologyManager localTopologyManager = (LocalTopologyManager)Mockito.mock(LocalTopologyManager.class);
        CacheNotifier cacheNotifier = (CacheNotifier)Mockito.mock(CacheNotifier.class);
        RpcManager rpcManager = (RpcManager)Mockito.mock(RpcManager.class);
        Transport transport = (Transport)Mockito.mock(Transport.class);
        CommandsFactory commandsFactory = (CommandsFactory)Mockito.mock(CommandsFactory.class);
        PersistenceManager persistenceManager = (PersistenceManager)Mockito.mock(PersistenceManager.class);
        InternalDataContainer dataContainer = (InternalDataContainer)Mockito.mock(InternalDataContainer.class);
        TransactionTable transactionTable = (TransactionTable)Mockito.mock(TransactionTable.class);
        StateTransferLock stateTransferLock = (StateTransferLock)Mockito.mock(StateTransferLock.class);
        AsyncInterceptorChain interceptorChain = (AsyncInterceptorChain)Mockito.mock(AsyncInterceptorChain.class);
        InvocationContextFactory icf = (InvocationContextFactory)Mockito.mock(InvocationContextFactory.class);
        InternalConflictManager conflictManager = (InternalConflictManager)Mockito.mock(InternalConflictManager.class);
        DistributionManager distributionManager = (DistributionManager)Mockito.mock(DistributionManager.class);
        LocalPublisherManager localPublisherManager = (LocalPublisherManager)Mockito.mock(LocalPublisherManager.class);
        PerCacheInboundInvocationHandler invocationHandler = (PerCacheInboundInvocationHandler)Mockito.mock(PerCacheInboundInvocationHandler.class);
        XSiteStateTransferManager xSiteStateTransferManager = (XSiteStateTransferManager)Mockito.mock(XSiteStateTransferManager.class);
        Mockito.when((Object)persistenceManager.removeSegments((IntSet)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when((Object)persistenceManager.addSegments((IntSet)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when((Object)persistenceManager.publishKeys((Predicate)ArgumentMatchers.any(), (Predicate)ArgumentMatchers.any())).thenReturn((Object)Flowable.empty());
        Mockito.when((Object)commandsFactory.buildStateTransferStartCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferStartCommand(ByteString.fromString((String)"cache1"), ((Integer)invocation.getArguments()[0]).intValue(), (IntSet)invocation.getArguments()[1]));
        Mockito.when((Object)commandsFactory.buildStateTransferGetTransactionsCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferGetTransactionsCommand(ByteString.fromString((String)"cache1"), ((Integer)invocation.getArguments()[0]).intValue(), (IntSet)invocation.getArguments()[1]));
        Mockito.when((Object)commandsFactory.buildStateTransferCancelCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferCancelCommand(ByteString.fromString((String)"cache1"), ((Integer)invocation.getArguments()[0]).intValue(), (IntSet)invocation.getArguments()[1]));
        Mockito.when((Object)transport.getViewId()).thenReturn((Object)1);
        Mockito.when((Object)rpcManager.getAddress()).thenReturn((Object)addresses[0]);
        Mockito.when((Object)rpcManager.getTransport()).thenReturn((Object)transport);
        ConcurrentHashMap requestedSegments = new ConcurrentHashMap();
        ConcurrentSkipListSet flatRequestedSegments = new ConcurrentSkipListSet();
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferGetTransactionsCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(invocation -> {
            Address recipient = (Address)invocation.getArgument(0);
            StateTransferGetTransactionsCommand cmd = (StateTransferGetTransactionsCommand)invocation.getArgument(1);
            IntSet segments = cmd.getSegments();
            requestedSegments.put(recipient, segments);
            flatRequestedSegments.addAll(segments);
            return CompletableFuture.completedFuture(SuccessfulResponse.create(new ArrayList()));
        });
        Answer successfulResponse = invocation -> CompletableFuture.completedFuture(SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferStartCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(successfulResponse);
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferCancelCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(successfulResponse);
        Mockito.when((Object)rpcManager.getSyncRpcOptions()).thenReturn((Object)new RpcOptions(DeliverOrder.NONE, 10000L, TimeUnit.MILLISECONDS));
        Mockito.when((Object)rpcManager.blocking((CompletionStage)ArgumentMatchers.any(CompletionStage.class))).thenAnswer(invocation -> ((CompletionStage)invocation.getArgument(0)).toCompletableFuture().join());
        ((XSiteStateTransferManager)Mockito.doNothing().when((Object)xSiteStateTransferManager)).onTopologyUpdated((CacheTopology)ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyBoolean());
        StateConsumerImpl stateConsumer = new StateConsumerImpl();
        TestingUtil.inject(stateConsumer, cache, TestingUtil.named("org.infinispan.executors.non-blocking", this.pooledExecutorService), localTopologyManager, interceptorChain, icf, configuration, rpcManager, commandsFactory, persistenceManager, dataContainer, transactionTable, stateTransferLock, cacheNotifier, new CommitManager(), new CommandAckCollector(), new TriangleOrderManager(0), new HashFunctionPartitioner(), conflictManager, distributionManager, localPublisherManager, invocationHandler, xSiteStateTransferManager);
        stateConsumer.start();
        ArrayList<ImmortalCacheEntry> cacheEntries = new ArrayList<ImmortalCacheEntry>();
        TestKey key1 = new TestKey("key1", 0, (KeyPartitioner)keyPartitioner);
        TestKey key2 = new TestKey("key2", 0, (KeyPartitioner)keyPartitioner);
        cacheEntries.add(new ImmortalCacheEntry((Object)key1, (Object)"value1"));
        cacheEntries.add(new ImmortalCacheEntry((Object)key2, (Object)"value2"));
        Mockito.when((Object)dataContainer.iterator()).thenAnswer(invocation -> cacheEntries.iterator());
        Mockito.when((Object)transactionTable.getLocalTransactions()).thenReturn(Collections.emptyList());
        Mockito.when((Object)transactionTable.getRemoteTransactions()).thenReturn(Collections.emptyList());
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        stateConsumer.onTopologyUpdate(new CacheTopology(1, 1, (ConsistentHash)ch2, null, CacheTopology.Phase.NO_REBALANCE, ch2.getMembers(), persistentUUIDManager.mapAddresses(ch2.getMembers())), false);
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        stateConsumer.onTopologyUpdate(new CacheTopology(2, 2, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23, CacheTopology.Phase.READ_OLD_WRITE_ALL, ch23.getMembers(), persistentUUIDManager.mapAddresses(ch23.getMembers())), true);
        AssertJUnit.assertTrue((boolean)stateConsumer.hasActiveTransfers());
        Set oldSegments = ch2.getSegmentsForOwner(addresses[0]);
        Set newSegments = ch3.getSegmentsForOwner(addresses[0]);
        newSegments.removeAll(oldSegments);
        log.debugf("Rebalancing. Added segments=%s, old segments=%s", (Object)newSegments, (Object)oldSegments);
        Assert.assertEquals(flatRequestedSegments, (Set)newSegments);
        Future<Object> future = this.fork(() -> StateConsumerTest.lambda$test1$7(stateConsumer, ch2, (PersistentUUIDManager)persistentUUIDManager));
        stateConsumer.onTopologyUpdate(new CacheTopology(3, 2, (ConsistentHash)ch2, null, CacheTopology.Phase.NO_REBALANCE, ch2.getMembers(), persistentUUIDManager.mapAddresses(ch2.getMembers())), false);
        future.get();
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        requestedSegments.clear();
        stateConsumer.onTopologyUpdate(new CacheTopology(4, 4, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23, CacheTopology.Phase.READ_OLD_WRITE_ALL, ch23.getMembers(), persistentUUIDManager.mapAddresses(ch23.getMembers())), true);
        AssertJUnit.assertTrue((boolean)stateConsumer.hasActiveTransfers());
        Assert.assertEquals(flatRequestedSegments, (Set)newSegments);
        ArrayList<StateChunk> stateChunks = new ArrayList<StateChunk>();
        for (Integer segment : newSegments) {
            stateChunks.add(new StateChunk(segment.intValue(), Collections.emptyList(), true));
        }
        stateConsumer.applyState(addresses[1], 2, false, stateChunks);
        stateConsumer.stop();
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
    }

    private static /* synthetic */ Object lambda$test1$7(StateConsumerImpl stateConsumer, DefaultConsistentHash ch2, PersistentUUIDManager persistentUUIDManager) throws Exception {
        stateConsumer.onTopologyUpdate(new CacheTopology(3, 2, (ConsistentHash)ch2, null, CacheTopology.Phase.NO_REBALANCE, ch2.getMembers(), persistentUUIDManager.mapAddresses(ch2.getMembers())), false);
        return null;
    }
}

