package org.apache.qpid.server.txn;

import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;

/* loaded from: input_file:org/apache/qpid/server/txn/FlowToDiskTransactionObserver.class */
public class FlowToDiskTransactionObserver implements TransactionObserver {
    private final AtomicLong _uncommittedMessageSize = new AtomicLong();
    private final ConcurrentMap<ServerTransaction, TransactionDetails> _uncommittedMessages = new ConcurrentHashMap();
    private final LogSubject _logSubject;
    private final EventLogger _eventLogger;
    private final long _maxUncommittedInMemorySize;
    private volatile boolean _reported;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/txn/FlowToDiskTransactionObserver$TransactionDetails.class */
    public static class TransactionDetails {
        private final AtomicLong _uncommittedMessageSize = new AtomicLong();
        private final Queue<StoredMessage<? extends StorableMessageMetaData>> _uncommittedMessages = new ConcurrentLinkedQueue();

        private TransactionDetails() {
        }

        private void messageEnqueued(StoredMessage<? extends StorableMessageMetaData> storedMessage) {
            this._uncommittedMessageSize.addAndGet(storedMessage.getContentSize() + storedMessage.getMetadataSize());
            this._uncommittedMessages.add(storedMessage);
        }

        private void flowToDisk() {
            Iterator<StoredMessage<? extends StorableMessageMetaData>> it = this._uncommittedMessages.iterator();
            while (it.hasNext()) {
                it.next().flowToDisk();
            }
            this._uncommittedMessages.clear();
        }

        private long getUncommittedMessageSize() {
            return this._uncommittedMessageSize.get();
        }
    }

    public FlowToDiskTransactionObserver(long j, LogSubject logSubject, EventLogger eventLogger) {
        this._logSubject = logSubject;
        this._eventLogger = eventLogger;
        this._maxUncommittedInMemorySize = j;
    }

    @Override // org.apache.qpid.server.txn.TransactionObserver
    public void onMessageEnqueue(ServerTransaction serverTransaction, EnqueueableMessage<? extends StorableMessageMetaData> enqueueableMessage) {
        StoredMessage<? extends StorableMessageMetaData> storedMessage = enqueueableMessage.getStoredMessage();
        long addAndGet = this._uncommittedMessageSize.addAndGet(storedMessage.getContentSize() + storedMessage.getMetadataSize());
        TransactionDetails computeIfAbsent = this._uncommittedMessages.computeIfAbsent(serverTransaction, serverTransaction2 -> {
            return new TransactionDetails();
        });
        computeIfAbsent.messageEnqueued(storedMessage);
        if (addAndGet > this._maxUncommittedInMemorySize) {
            try {
                computeIfAbsent.flowToDisk();
                if (this._reported) {
                    return;
                }
                this._eventLogger.message(this._logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(Long.valueOf(addAndGet), Long.valueOf(this._maxUncommittedInMemorySize)));
                this._reported = true;
            } catch (Throwable th) {
                if (!this._reported) {
                    this._eventLogger.message(this._logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(Long.valueOf(addAndGet), Long.valueOf(this._maxUncommittedInMemorySize)));
                    this._reported = true;
                }
                throw th;
            }
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionObserver
    public void onDischarge(ServerTransaction serverTransaction) {
        TransactionDetails remove = this._uncommittedMessages.remove(serverTransaction);
        if (remove != null) {
            this._uncommittedMessageSize.addAndGet(-remove.getUncommittedMessageSize());
        }
        if (this._maxUncommittedInMemorySize > this._uncommittedMessageSize.get()) {
            this._reported = false;
        }
    }

    @Override // org.apache.qpid.server.txn.TransactionObserver
    public void reset() {
        this._uncommittedMessages.clear();
        this._uncommittedMessageSize.set(0L);
    }
}
