/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.statetransfer.failures;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.BackupReceiverDelegator;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.failures.AbstractTopologyChangeTest;
import org.testng.annotations.Test;

@Test(groups={"xsite", "unstable"}, testName="xsite.statetransfer.failures.SiteConsumerTopologyChangeTest")
public class SiteConsumerTopologyChangeTest
extends AbstractTopologyChangeTest {
    @Test(enabled=false, description="Will be fixed by ISPN-6228")
    public void testJoinDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        this.doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        this.doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testSiteMasterLeaveDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        this.doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testXSiteDuringJoin() throws InterruptedException, ExecutionException, TimeoutException {
        this.doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testXSiteSTDuringLeave() throws InterruptedException, ExecutionException, TimeoutException {
        this.doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    private void doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent event) throws TimeoutException, InterruptedException, ExecutionException {
        log.debugf("Start topology change during x-site state transfer with %s", (Object)event);
        this.initBeforeTest();
        AbstractTopologyChangeTest.TestCaches testCaches = this.createTestCache(event, "NYC-2");
        this.printTestCaches(testCaches);
        CheckPoint checkPoint = new CheckPoint();
        AtomicBoolean discard = new AtomicBoolean(true);
        TestingUtil.wrapComponent(this.cache("NYC-2", 0), BackupReceiver.class, current -> new BlockingBackupReceiver((BackupReceiver)current, discard, checkPoint));
        log.debug((Object)"Start x-site state transfer");
        this.startStateTransfer(testCaches.coordinator, "NYC-2");
        this.assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-block", 30L, TimeUnit.SECONDS);
        Future<Void> topologyChangeFuture = this.triggerTopologyChange("NYC-2", testCaches.removeIndex);
        discard.set(false);
        checkPoint.triggerForever("blocked");
        topologyChangeFuture.get();
        this.awaitXSiteStateSent("LON-1");
        this.awaitLocalStateTransfer("NYC-2");
        this.assertEventuallyNoStateTransferInReceivingSite(null);
        this.assertData();
    }

    private void doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent event) throws TimeoutException, InterruptedException, ExecutionException {
        log.debugf("Start topology change during x-site state transfer with %s", (Object)event);
        this.initBeforeTest();
        AbstractTopologyChangeTest.TestCaches testCaches = this.createTestCache(event, "NYC-2");
        this.printTestCaches(testCaches);
        BlockingLocalTopologyManager topologyManager = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(testCaches.controllerCache.getCacheManager());
        CheckPoint checkPoint = new CheckPoint();
        TestingUtil.wrapComponent(this.cache("NYC-2", 0), BackupReceiver.class, current -> new NotifierBackupReceiver((BackupReceiver)current, checkPoint));
        Future<Void> topologyEventFuture = this.triggerTopologyChange("NYC-2", testCaches.removeIndex);
        if (event == AbstractTopologyChangeTest.TopologyEvent.LEAVE) {
            topologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        }
        topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL);
        log.debug((Object)"Start x-site state transfer");
        this.startStateTransfer(testCaches.coordinator, "NYC-2");
        this.assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-chunk", 30L, TimeUnit.SECONDS);
        topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_NEW_WRITE_ALL);
        topologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        topologyEventFuture.get();
        this.awaitXSiteStateSent("LON-1");
        this.awaitLocalStateTransfer("NYC-2");
        this.assertEventuallyNoStateTransferInReceivingSite(null);
        this.assertData();
    }

    @Scope(value=Scopes.NAMED_CACHE)
    static class BlockingBackupReceiver
    extends BackupReceiverDelegator {
        private final Set<Address> addressSet = new HashSet<Address>();
        private final AtomicBoolean discard;
        private final CheckPoint checkPoint;
        @Inject
        DistributionManager manager;

        BlockingBackupReceiver(BackupReceiver delegate, AtomicBoolean discard, CheckPoint checkPoint) {
            super(delegate);
            this.discard = discard;
            this.checkPoint = checkPoint;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand cmd) {
            if (!this.discard.get()) {
                return this.delegate.handleStateTransferState(cmd);
            }
            Set<Address> set = this.addressSet;
            synchronized (set) {
                if (this.addressSet.size() == 3) {
                    this.checkPoint.trigger("before-block");
                    try {
                        this.checkPoint.awaitStrict("blocked", 30L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException | TimeoutException e) {
                        return CompletableFutures.completedExceptionFuture((Throwable)e);
                    }
                    return this.delegate.handleStateTransferState(cmd);
                }
                for (XSiteState state : cmd.getChunk()) {
                    this.addressSet.add(this.manager.getCacheTopology().getDistribution(state.key()).primary());
                }
            }
            return this.delegate.handleStateTransferState(cmd);
        }
    }

    private static class NotifierBackupReceiver
    extends BackupReceiverDelegator {
        private final CheckPoint checkPoint;

        NotifierBackupReceiver(BackupReceiver delegate, CheckPoint checkPoint) {
            super(delegate);
            this.checkPoint = checkPoint;
        }

        @Override
        public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand cmd) {
            this.checkPoint.trigger("before-chunk");
            return this.delegate.handleStateTransferState(cmd);
        }
    }
}

