/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.InvalidTransactionException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.SegmentSpecificCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.FunctionalCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.tx.AbstractTransactionBoundaryCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.IracPutKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.functional.Param;
import org.infinispan.interceptors.InvocationStage;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.interceptors.impl.JmxStatsCommandInterceptor;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="CacheStore", description="Component that handles storing of entries to a CacheStore from memory.")
public class CacheWriterInterceptor
extends JmxStatsCommandInterceptor {
    private static final Log log = LogFactory.getLog(CacheWriterInterceptor.class);
    @Inject
    protected PersistenceManager persistenceManager;
    @Inject
    InternalEntryFactory entryFactory;
    @Inject
    TransactionManager transactionManager;
    @Inject
    KeyPartitioner keyPartitioner;
    @Inject
    MarshallableEntryFactory<?, ?> marshalledEntryFactory;
    final AtomicLong cacheStores = new AtomicLong(0L);
    private volatile boolean usingTransactionalStores;
    protected final InvocationSuccessFunction<PutMapCommand> handlePutMapCommandReturn = this::handlePutMapCommandReturn;
    private final InvocationSuccessFunction<AbstractTransactionBoundaryCommand> afterCommit = this::afterCommit;

    protected Log getLog() {
        return log;
    }

    @Start(priority=15)
    protected void start() {
        this.setStatisticsEnabled(this.cacheConfiguration.statistics().enabled());
        if (this.cacheConfiguration.transaction().transactionMode().isTransactional()) {
            this.persistenceManager.addStoreListener(persistenceStatus -> {
                this.usingTransactionalStores = persistenceStatus.usingTransactionalStore();
            });
            this.usingTransactionalStores = this.persistenceManager.hasStore(StoreConfiguration::transactional);
        }
    }

    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
        if (this.usingTransactionalStores) {
            return this.invokeNext(ctx, command);
        }
        return this.invokeNextThenApply(ctx, command, this.afterCommit);
    }

    @Override
    public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
        if (this.usingTransactionalStores) {
            return this.invokeNext(ctx, command);
        }
        if (command.isOnePhaseCommit()) {
            return this.invokeNextThenApply(ctx, command, this.afterCommit);
        }
        return this.invokeNext(ctx, command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected InvocationStage commitModifications(TxInvocationContext<AbstractCacheTransaction> ctx) throws Throwable {
        List<WriteCommand> allModifications = ctx.getCacheTransaction().getAllModifications();
        if (!allModifications.isEmpty()) {
            GlobalTransaction tx = ctx.getGlobalTransaction();
            if (log.isTraceEnabled()) {
                this.getLog().tracef("Persisting transaction %s modifications: %s", tx, allModifications);
            }
            Transaction xaTx = null;
            try {
                xaTx = this.suspendRunningTx(ctx);
                InvocationStage invocationStage = this.store(ctx);
                return invocationStage;
            }
            finally {
                this.resumeRunningTx(xaTx);
            }
        }
        return null;
    }

    private Object afterCommit(InvocationContext context, VisitableCommand command, Object rv) throws Throwable {
        InvocationStage stage = this.commitModifications((TxInvocationContext)context);
        return stage == null ? rv : stage.thenReturn(context, command, rv);
    }

    private void resumeRunningTx(Transaction xaTx) throws InvalidTransactionException, SystemException {
        if (this.transactionManager != null && xaTx != null) {
            this.transactionManager.resume(xaTx);
        }
    }

    private Transaction suspendRunningTx(TxInvocationContext<?> ctx) throws SystemException {
        Transaction xaTx = null;
        if (this.transactionManager != null && (xaTx = this.transactionManager.suspend()) != null && !ctx.isOriginLocal()) {
            throw new IllegalStateException("It is only possible to be in the context of an JRA transaction in the local node.");
        }
        return xaTx;
    }

    boolean shouldReplicateRemove(InvocationContext ctx, RemoveCommand removeCommand) {
        return removeCommand.shouldReplicate(ctx, true);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, removeCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)removeCommand) || rCtx.isInTxScope() || !this.shouldReplicateRemove(ctx, command) || !this.isProperWriter(rCtx, (FlagAffectedCommand)removeCommand, removeCommand.getKey())) {
                return rv;
            }
            Object key = removeCommand.getKey();
            CompletionStage<Object> stage = this.persistenceManager.deleteFromAllStores(key, removeCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
            if (log.isTraceEnabled()) {
                stage = stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    @Override
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) {
        if (this.isStoreEnabled(command) && !ctx.isInTxScope()) {
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.persistenceManager.clearAllStores(ctx.isOriginLocal() ? PersistenceManager.AccessMode.BOTH : PersistenceManager.AccessMode.PRIVATE));
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.visitDataWriteCommandToStore(ctx, command);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.visitDataWriteCommandToStore(ctx, command);
    }

    @Override
    public Object visitIracPutKeyValueCommand(InvocationContext ctx, IracPutKeyValueCommand command) {
        return this.visitDataWriteCommandToStore(ctx, command);
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, computeCommand, rv) -> {
            CompletionStage<Object> resultStage;
            if (!this.isStoreEnabled((FlagAffectedCommand)computeCommand) || rCtx.isInTxScope() || !computeCommand.shouldReplicate(ctx, true) || !this.isProperWriter(rCtx, (FlagAffectedCommand)computeCommand, computeCommand.getKey())) {
                return rv;
            }
            Object key = computeCommand.getKey();
            if (rv == null) {
                CompletionStage<Boolean> stage = this.persistenceManager.deleteFromAllStores(key, computeCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
                resultStage = log.isTraceEnabled() ? stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed)) : stage;
            } else {
                resultStage = this.storeEntry(rCtx, key, (FlagAffectedCommand)computeCommand);
            }
            return CacheWriterInterceptor.delayedValue(resultStage, rv);
        });
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, (rCtx, computeIfAbsentCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)computeIfAbsentCommand) || rCtx.isInTxScope() || !computeIfAbsentCommand.shouldReplicate(ctx, true)) {
                return rv;
            }
            if (!this.isProperWriter(rCtx, (FlagAffectedCommand)computeIfAbsentCommand, computeIfAbsentCommand.getKey())) {
                return rv;
            }
            if (rv != null) {
                Object key = computeIfAbsentCommand.getKey();
                return CacheWriterInterceptor.delayedValue(this.storeEntry(rCtx, key, (FlagAffectedCommand)computeIfAbsentCommand), rv);
            }
            return null;
        });
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        if (!this.isStoreEnabled(command) || ctx.isInTxScope()) {
            return this.invokeNext(ctx, command);
        }
        return this.invokeNextThenApply(ctx, command, this.handlePutMapCommandReturn);
    }

    protected Object handlePutMapCommandReturn(InvocationContext rCtx, PutMapCommand putMapCommand, Object rv) {
        CompletionStage<Long> putMapStage = this.persistenceManager.writeMapCommand(putMapCommand, rCtx, (writeCommand, o) -> this.isProperWriter(rCtx, (FlagAffectedCommand)writeCommand, o));
        if (this.getStatisticsEnabled()) {
            putMapStage.thenAccept(this.cacheStores::getAndAdd);
        }
        return CacheWriterInterceptor.delayedValue(putMapStage, rv);
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.visitWriteCommand(ctx, command);
    }

    private <T extends DataWriteCommand & FunctionalCommand> Object visitWriteCommand(InvocationContext ctx, T command) {
        return this.invokeNextThenApply(ctx, command, (rCtx, dataWriteCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)dataWriteCommand) || rCtx.isInTxScope() || !dataWriteCommand.shouldReplicate(ctx, true) || !this.isProperWriter(rCtx, (FlagAffectedCommand)dataWriteCommand, dataWriteCommand.getKey())) {
                return rv;
            }
            CompletionStage<Object> stage = CompletableFutures.completedNull();
            Param persistMode = ((FunctionalCommand)((Object)dataWriteCommand)).getParams().get(0);
            switch ((Param.PersistenceMode)persistMode.get()) {
                case LOAD_PERSIST: 
                case SKIP_LOAD: {
                    Object key = dataWriteCommand.getKey();
                    CacheEntry entry = rCtx.lookupEntry(key);
                    if (entry != null) {
                        if (entry.isRemoved()) {
                            stage = this.persistenceManager.deleteFromAllStores(key, dataWriteCommand.getSegment(), PersistenceManager.AccessMode.BOTH);
                            if (log.isTraceEnabled()) {
                                stage = stage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
                            }
                        } else if (entry.isChanged()) {
                            stage = this.storeEntry(rCtx, key, (FlagAffectedCommand)dataWriteCommand);
                            if (log.isTraceEnabled()) {
                                stage = stage.thenAccept(removed -> this.getLog().tracef("Stored entry for key %s in CacheStore", key));
                            }
                        } else if (log.isTraceEnabled()) {
                            this.getLog().tracef("Skipping write for key %s as entry wasn't changed", new Object[0]);
                        }
                    }
                    log.trace("Skipping cache store since entry was not found in context");
                    break;
                }
                case SKIP_PERSIST: 
                case SKIP: {
                    log.trace("Skipping cache store since persistence mode parameter is SKIP");
                }
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        return this.visitWriteManyCommand(ctx, command);
    }

    private <T extends WriteCommand & FunctionalCommand> Object visitWriteManyCommand(InvocationContext ctx, T command) {
        return this.invokeNextThenApply(ctx, command, (rCtx, manyEntriesCommand, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)manyEntriesCommand) || rCtx.isInTxScope()) {
                return rv;
            }
            CompletionStage<Object> stage = CompletableFutures.completedNull();
            Param persistMode = ((FunctionalCommand)((Object)manyEntriesCommand)).getParams().get(0);
            switch ((Param.PersistenceMode)persistMode.get()) {
                case LOAD_PERSIST: 
                case SKIP_LOAD: {
                    AggregateCompletionStage<Void> composedCompletionStage = CompletionStages.aggregateCompletionStage();
                    int storedCount = 0;
                    for (Object key : manyEntriesCommand.getAffectedKeys()) {
                        CacheEntry entry = rCtx.lookupEntry(key);
                        if (entry == null) continue;
                        if (entry.isRemoved()) {
                            CompletionStage<Object> innerStage = this.persistenceManager.deleteFromAllStores(key, this.keyPartitioner.getSegment(key), PersistenceManager.AccessMode.BOTH);
                            if (log.isTraceEnabled()) {
                                innerStage = innerStage.thenAccept(removed -> this.getLog().tracef("Removed entry under key %s and got response %s from CacheStore", key, removed));
                            }
                            composedCompletionStage.dependsOn(innerStage);
                            continue;
                        }
                        if (!entry.isChanged() || !this.isProperWriter(rCtx, (FlagAffectedCommand)manyEntriesCommand, key)) continue;
                        composedCompletionStage.dependsOn(this.storeEntry(rCtx, key, (FlagAffectedCommand)manyEntriesCommand, false));
                        ++storedCount;
                    }
                    if (this.getStatisticsEnabled()) {
                        this.cacheStores.getAndAdd(storedCount);
                    }
                    stage = composedCompletionStage.freeze();
                    break;
                }
                case SKIP_PERSIST: 
                case SKIP: {
                    log.trace("Skipping cache store since persistence mode parameter is SKIP");
                }
            }
            return CacheWriterInterceptor.delayedValue(stage, rv);
        });
    }

    protected InvocationStage store(TxInvocationContext<AbstractCacheTransaction> ctx) throws Throwable {
        CompletionStage<Long> batchStage = this.persistenceManager.performBatch(ctx, (writeCommand, k, v) -> this.isProperWriter(ctx, (FlagAffectedCommand)writeCommand, k));
        if (this.getStatisticsEnabled()) {
            batchStage.thenAccept(this.cacheStores::addAndGet);
        }
        return CacheWriterInterceptor.asyncValue(batchStage);
    }

    protected boolean isStoreEnabled(FlagAffectedCommand command) {
        if (command.hasAnyFlag(FlagBitSets.SKIP_CACHE_STORE)) {
            log.trace("Skipping cache store since the call contain a skip cache store flag");
            return false;
        }
        return true;
    }

    protected boolean isProperWriter(InvocationContext ctx, FlagAffectedCommand command, Object key) {
        return true;
    }

    @Override
    public void resetStatistics() {
        this.cacheStores.set(0L);
    }

    @ManagedAttribute(description="Number of writes to the store", displayName="Number of writes to the store", measurementType=MeasurementType.TRENDSUP)
    public long getWritesToTheStores() {
        return this.cacheStores.get();
    }

    @ManagedAttribute(description="Number of entries currently persisted excluding expired entries", displayName="Number of persisted entries")
    public int getNumberOfPersistedEntries() {
        long size = CompletionStages.join(this.persistenceManager.size());
        return (int)Math.min(size, Integer.MAX_VALUE);
    }

    CompletionStage<Void> storeEntry(InvocationContext ctx, Object key, FlagAffectedCommand command) {
        return this.storeEntry(ctx, key, command, true);
    }

    CompletionStage<Void> storeEntry(InvocationContext ctx, Object key, FlagAffectedCommand command, boolean incrementStats) {
        if (this.persistenceManager.isReadOnly()) {
            return CompletableFutures.completedNull();
        }
        MarshallableEntry<Object, Object> entry = this.marshalledEntry(ctx, key);
        if (entry != null) {
            CompletionStage<Void> stage = this.persistenceManager.writeToAllNonTxStores(entry, SegmentSpecificCommand.extractSegment(command, key, this.keyPartitioner), this.skipSharedStores(ctx, key, command) ? PersistenceManager.AccessMode.PRIVATE : PersistenceManager.AccessMode.BOTH, command.getFlagsBitSet());
            if (log.isTraceEnabled()) {
                stage = stage.thenAccept(ignore -> this.getLog().tracef("Stored entry %s under key %s", entry.getValue(), key));
            }
            if (incrementStats && this.getStatisticsEnabled()) {
                stage = stage.thenAccept(ignore -> this.cacheStores.incrementAndGet());
            }
            return stage;
        }
        return CompletableFutures.completedNull();
    }

    MarshallableEntry<Object, Object> marshalledEntry(InvocationContext ctx, Object key) {
        InternalCacheValue sv = this.entryFactory.getValueFromCtx(key, ctx);
        return sv != null ? this.marshalledEntryFactory.create(key, sv) : null;
    }

    protected boolean skipSharedStores(InvocationContext ctx, Object key, FlagAffectedCommand command) {
        return !ctx.isOriginLocal() || command.hasAnyFlag(FlagBitSets.SKIP_SHARED_CACHE_STORE);
    }

    private Object visitDataWriteCommandToStore(InvocationContext ctx, DataWriteCommand command) {
        return this.invokeNextThenApply(ctx, command, (rCtx, cmd, rv) -> {
            if (!this.isStoreEnabled((FlagAffectedCommand)cmd) || rCtx.isInTxScope() || !cmd.shouldReplicate(ctx, true)) {
                return rv;
            }
            if (!this.isProperWriter(rCtx, (FlagAffectedCommand)cmd, cmd.getKey())) {
                return rv;
            }
            Object key = cmd.getKey();
            return CacheWriterInterceptor.delayedValue(this.storeEntry(rCtx, key, (FlagAffectedCommand)cmd), rv);
        });
    }
}

