/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.test;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;

public class ReplListener {
    private static final Log log = LogFactory.getLog(ReplListener.class);
    private final Cache<?, ?> cache;
    private final Lock lock = new ReentrantLock();
    private final Condition newCommandCondition = this.lock.newCondition();
    @GuardedBy(value="lock")
    private final List<Predicate<VisitableCommand>> expectedCommands = new ArrayList<Predicate<VisitableCommand>>();
    @GuardedBy(value="lock")
    private final Queue<VisitableCommand> loggedCommands = new ArrayDeque<VisitableCommand>();
    @GuardedBy(value="lock")
    private boolean watchLocal;

    public ReplListener(Cache<?, ?> cache) {
        this(cache, false);
    }

    public ReplListener(Cache<?, ?> cache, boolean recordCommandsEagerly) {
        this(cache, recordCommandsEagerly, false);
    }

    public ReplListener(Cache<?, ?> cache, boolean recordCommandsEagerly, boolean watchLocal) {
        this.cache = cache;
        this.watchLocal = watchLocal;
        TestingUtil.extractInterceptorChain(cache).addInterceptor((AsyncInterceptor)new ReplListenerInterceptor(), 1);
    }

    public void expectAny() {
        this.expect((VisitableCommand c) -> true);
    }

    public void expectWithTx(Class<? extends VisitableCommand> ... commands) {
        ArrayList<Class<CommitCommand>> cmdsToExpect = new ArrayList<Class<CommitCommand>>();
        cmdsToExpect.add(PrepareCommand.class);
        if (commands != null) {
            cmdsToExpect.addAll(Arrays.asList(commands));
        }
        if (this.cache.getCacheConfiguration().clustering().cacheMode().isSynchronous()) {
            cmdsToExpect.add(CommitCommand.class);
        }
        this.expect(cmdsToExpect.toArray(new Class[cmdsToExpect.size()]));
    }

    public void expectAnyWithTx() {
        ArrayList<Class> cmdsToExpect = new ArrayList<Class>(2);
        cmdsToExpect.add(PrepareCommand.class);
        if (this.cache.getCacheConfiguration().clustering().cacheMode().isSynchronous()) {
            cmdsToExpect.add(CommitCommand.class);
        }
        this.expect(cmdsToExpect.toArray(new Class[cmdsToExpect.size()]));
    }

    public void expect(Class<? extends VisitableCommand> ... expectedCommands) {
        Function<Class, Predicate> predicateGenerator = clazz -> clazz::isInstance;
        this.expect(Stream.of(expectedCommands).map(predicateGenerator).collect(Collectors.toList()));
    }

    public void expect(Class<? extends VisitableCommand> expectedCommand) {
        this.expect(Collections.singleton(expectedCommand::isInstance));
    }

    public void expect(Predicate<VisitableCommand> predicate) {
        this.expect(Collections.singleton(predicate));
    }

    public void expect(Predicate<VisitableCommand> ... predicates) {
        this.expect(Arrays.asList(predicates));
    }

    public void expect(Collection<Predicate<VisitableCommand>> predicates) {
        this.lock.lock();
        try {
            this.expectedCommands.addAll(predicates);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void debugf(String format, Object ... params) {
        log.debugf("[" + this.cache.getCacheManager().getAddress() + "] " + format, params);
    }

    public void waitForRpc() {
        this.waitForRpc(30L, TimeUnit.SECONDS);
    }

    public void waitForRpc(long time, TimeUnit unit) {
        AssertJUnit.assertFalse((String)"there are no replication expectations; please use ReplListener.expect() before calling this method", (boolean)this.expectedCommands.isEmpty());
        this.lock.lock();
        try {
            long remainingNanos = unit.toNanos(time);
            while (true) {
                this.debugf("Waiting for %d command(s)", this.expectedCommands.size());
                Iterator itCommand = this.loggedCommands.iterator();
                block6: while (itCommand.hasNext()) {
                    VisitableCommand command = (VisitableCommand)itCommand.next();
                    Iterator<Predicate<VisitableCommand>> itExpectation = this.expectedCommands.iterator();
                    while (itExpectation.hasNext()) {
                        Predicate<VisitableCommand> expectation = itExpectation.next();
                        if (!expectation.test(command)) continue;
                        this.debugf("Matched command %s", command);
                        itCommand.remove();
                        itExpectation.remove();
                        continue block6;
                    }
                }
                if (this.expectedCommands.isEmpty()) {
                    this.newCommandCondition.signalAll();
                }
                if (this.expectedCommands.isEmpty()) {
                    break;
                }
                remainingNanos = this.newCommandCondition.awaitNanos(remainingNanos);
                Address address = this.cache.getCacheManager().getAddress();
                AssertJUnit.assertTrue((String)("Waiting for more than " + time + " " + (Object)((Object)unit) + " and some commands did not replicate on cache [" + address + "]"), (remainingNanos > 0L ? 1 : 0) != 0);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TestException("Interrupted", e);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void assertNoRpc() {
        this.debugf("Expecting no commands", new Object[0]);
        for (VisitableCommand command : this.loggedCommands) {
            for (Predicate<VisitableCommand> expectation : this.expectedCommands) {
                AssertJUnit.assertFalse((String)("Shouldn't have matched command " + command), (boolean)expectation.test(command));
            }
        }
    }

    public Cache<?, ?> getCache() {
        return this.cache;
    }

    public void resetEager() {
        this.lock.lock();
        try {
            this.loggedCommands.clear();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void reconfigureListener(boolean watchLocal) {
        this.lock.lock();
        try {
            this.watchLocal = watchLocal;
        }
        finally {
            this.lock.unlock();
        }
    }

    private boolean isPrimaryOwner(VisitableCommand cmd) {
        if (cmd instanceof DataCommand) {
            return this.cache.getAdvancedCache().getDistributionManager().getCacheTopology().getDistribution(((DataCommand)cmd).getKey()).isPrimary();
        }
        return true;
    }

    protected class ReplListenerInterceptor
    extends DDAsyncInterceptor {
        protected ReplListenerInterceptor() {
        }

        protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (!ctx.isOriginLocal() || ReplListener.this.watchLocal && ReplListener.this.isPrimaryOwner(cmd)) {
                ReplListener.this.debugf("Delaying command %s", new Object[]{cmd});
                TestingUtil.sleepRandom(10);
            }
            return this.invokeNextAndFinally(ctx, cmd, (rCtx, rCommand, rv, throwable) -> {
                if (!ctx.isOriginLocal() || ReplListener.this.watchLocal && ReplListener.this.isPrimaryOwner(cmd)) {
                    this.logCommand(cmd);
                } else {
                    ReplListener.this.debugf("Not logging command (watchLocal=%b) %s", new Object[]{ReplListener.this.watchLocal, cmd});
                }
            });
        }

        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand cmd) throws Throwable {
            return this.invokeNextAndFinally((InvocationContext)ctx, (VisitableCommand)cmd, (rCtx, rCommand, rv, throwable) -> {
                if (!ctx.isOriginLocal() || ReplListener.this.watchLocal) {
                    this.logCommand((VisitableCommand)cmd);
                    for (WriteCommand mod : cmd.getModifications()) {
                        this.logCommand((VisitableCommand)mod);
                    }
                }
            });
        }

        private void logCommand(VisitableCommand cmd) {
            ReplListener.this.lock.lock();
            try {
                ReplListener.this.debugf("ReplListener saw command %s", new Object[]{cmd});
                ReplListener.this.loggedCommands.add(cmd);
                ReplListener.this.newCommandCondition.signalAll();
            }
            finally {
                ReplListener.this.lock.unlock();
            }
        }
    }
}

