/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.api;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.MagicKey;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.distribution.VersionedDistributionInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="api.ConditionalOperationsConcurrentWriteSkewTest")
public class ConditionalOperationsConcurrentWriteSkewTest
extends MultipleCacheManagersTest {
    private static final int NODES_NUM = 3;
    private final CacheMode mode = CacheMode.DIST_SYNC;
    protected LockingMode lockingMode = LockingMode.OPTIMISTIC;
    protected boolean writeSkewCheck = true;
    protected boolean transactional = true;

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder dcc = ConditionalOperationsConcurrentWriteSkewTest.getDefaultClusteredCacheConfig(this.mode, true);
        dcc.transaction().lockingMode(this.lockingMode);
        if (this.writeSkewCheck) {
            dcc.transaction().locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        }
        this.createCluster(TestDataSCI.INSTANCE, dcc, 3);
        this.waitForClusterToForm();
    }

    public void testSimpleConcurrentReplace() throws Exception {
        this.doSimpleConcurrentTest(Operation.REPLACE);
    }

    public void testSimpleConcurrentPut() throws Exception {
        this.doSimpleConcurrentTest(Operation.PUT);
    }

    public void testSimpleConcurrentRemove() throws Exception {
        this.doSimpleConcurrentTest(Operation.REMOVE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSimpleConcurrentTest(Operation operation) throws Exception {
        AssertJUnit.assertEquals((String)"Wrong number of owner. Please change the configuration", (int)2, (int)this.cache(0).getCacheConfiguration().clustering().hash().numOwners());
        MagicKey key = new MagicKey(this.cache(0), this.cache(1));
        try {
            CommandInterceptorController controller = this.injectController(this.cache(1));
            if (operation == Operation.REMOVE || operation == Operation.REPLACE) {
                this.cache(0).put((Object)key, (Object)"v1");
            }
            controller.awaitCommit.close();
            controller.blockCommit.close();
            Future<Boolean> tx1 = this.fork(() -> {
                this.tm(1).begin();
                this.cache(1).put(key, (Object)"tx1");
                this.tm(1).commit();
                return Boolean.TRUE;
            });
            controller.awaitCommit.await(30L, TimeUnit.SECONDS);
            controller.blockRemoteGet.close();
            Future<Boolean> tx2 = this.fork(() -> {
                this.tm(2).begin();
                switch (operation) {
                    case REMOVE: {
                        this.cache(2).remove(key, (Object)"v1");
                        break;
                    }
                    case REPLACE: {
                        this.cache(2).replace(key, (Object)"v1", (Object)"tx2");
                        break;
                    }
                    case PUT: {
                        this.cache(2).putIfAbsent(key, (Object)"tx2");
                    }
                }
                this.tm(2).commit();
                return Boolean.TRUE;
            });
            AssertJUnit.assertTrue((String)"Tx2 has not finished", (boolean)tx2.get(20L, TimeUnit.SECONDS));
            controller.reset();
            AssertJUnit.assertTrue((String)"Tx1 has not finished", (boolean)tx1.get(20L, TimeUnit.SECONDS));
            this.assertNoTransactions();
            for (Cache cache : this.caches()) {
                AssertJUnit.assertEquals((String)("Wrong value for cache " + this.address(cache)), (Object)"tx1", (Object)cache.get((Object)key));
            }
        }
        finally {
            this.removeController(this.cache(1));
        }
    }

    private CommandInterceptorController injectController(Cache cache) {
        CommandInterceptorController commandInterceptorController = new CommandInterceptorController();
        TestingUtil.extractInterceptorChain(cache).addInterceptorBefore((AsyncInterceptor)commandInterceptorController, VersionedDistributionInterceptor.class);
        return commandInterceptorController;
    }

    private void removeController(Cache cache) {
        TestingUtil.extractInterceptorChain(cache).removeInterceptor(CommandInterceptorController.class);
    }

    class CommandInterceptorController
    extends DDAsyncInterceptor {
        private final ReclosableLatch blockRemoteGet = new ReclosableLatch(true);
        private final ReclosableLatch blockCommit = new ReclosableLatch(true);
        private final ReclosableLatch awaitPrepare = new ReclosableLatch(true);
        private final ReclosableLatch awaitCommit = new ReclosableLatch(true);

        CommandInterceptorController() {
        }

        public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
            return this.invokeNextAndFinally(ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> {
                log.debug((Object)"visit GetKeyValueCommand");
                if (!ctx.isOriginLocal() && this.blockRemoteGet != null) {
                    log.debug((Object)"Remote Get Received... blocking...");
                    this.blockRemoteGet.await(30L, TimeUnit.SECONDS);
                }
            });
        }

        public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
            return this.invokeNextAndFinally(ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> {
                log.debug((Object)"visit GetCacheEntryCommand");
                if (!ctx.isOriginLocal() && this.blockRemoteGet != null) {
                    log.debug((Object)"Remote Get Received... blocking...");
                    this.blockRemoteGet.await(30L, TimeUnit.SECONDS);
                }
            });
        }

        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
            return this.invokeNextAndFinally((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> {
                log.debug((Object)"visit Prepare");
                if (this.awaitPrepare != null) {
                    log.debug((Object)"Prepare Received... unblocking");
                    this.awaitPrepare.open();
                }
            });
        }

        public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
            return this.invokeNextAndFinally((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> {
                if (ctx.isOriginLocal()) {
                    log.debug((Object)"visit Commit");
                    if (this.awaitCommit != null) {
                        log.debug((Object)"Commit Received... unblocking...");
                        this.awaitCommit.open();
                    }
                    if (this.blockCommit != null) {
                        log.debug((Object)"Commit Received... blocking...");
                        this.blockCommit.await(30L, TimeUnit.SECONDS);
                    }
                }
            });
        }

        public void reset() {
            if (this.blockCommit != null) {
                this.blockCommit.open();
            }
            if (this.blockRemoteGet != null) {
                this.blockRemoteGet.open();
            }
            if (this.awaitPrepare != null) {
                this.awaitPrepare.open();
            }
            if (this.awaitCommit != null) {
                this.awaitCommit.open();
            }
        }
    }

    private static enum Operation {
        PUT,
        REPLACE,
        REMOVE;

    }
}

