package io.ballerina.messaging.broker.core.queue;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.Queue;
import io.ballerina.messaging.broker.core.store.SharedMessageStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.Xid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ballerina/messaging/broker/core/queue/DbBackedQueueImpl.class */
public class DbBackedQueueImpl extends Queue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DbBackedQueueImpl.class);
    private final SharedMessageStore sharedMessageStore;
    private final QueueBuffer buffer;
    private final Map<Xid, List<Message>> pendingEnqueueMessages;
    private final Map<Xid, List<Message>> pendingDequeueMessages;

    public DbBackedQueueImpl(String str, boolean z, SharedMessageStore sharedMessageStore, QueueBufferFactory queueBufferFactory) throws BrokerException {
        super(str, true, z);
        this.sharedMessageStore = sharedMessageStore;
        sharedMessageStore.getClass();
        this.buffer = queueBufferFactory.createBuffer(sharedMessageStore::readData);
        LOGGER.debug("Recovering messages for queue {}", str);
        Collection<Message> readStoredMessages = sharedMessageStore.readStoredMessages(str);
        this.pendingEnqueueMessages = new ConcurrentHashMap();
        this.pendingDequeueMessages = new ConcurrentHashMap();
        this.buffer.addAllBareMessages(readStoredMessages);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("{} messages recovered for queue {}", Integer.valueOf(readStoredMessages.size()), str);
        }
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public int capacity() {
        return Queue.UNBOUNDED;
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public int size() {
        return this.buffer.getNumberOfInflightMessages();
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public boolean enqueue(Message message) throws BrokerException {
        if (message.getMetadata().isPersistent()) {
            this.sharedMessageStore.attach(getName(), message.getInternalId());
        }
        this.buffer.add(message);
        return true;
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public void prepareEnqueue(Xid xid, Message message) throws BrokerException {
        if (message.getMetadata().isPersistent()) {
            this.sharedMessageStore.attach(xid, getName(), message.getInternalId());
        }
        this.pendingEnqueueMessages.computeIfAbsent(xid, xid2 -> {
            return new ArrayList();
        }).add(message);
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public void commit(Xid xid) {
        List<Message> list = this.pendingDequeueMessages.get(xid);
        if (Objects.nonNull(list)) {
            this.buffer.removeAll(list);
        }
        List<Message> list2 = this.pendingEnqueueMessages.get(xid);
        if (Objects.nonNull(list2)) {
            this.buffer.addAll(list2);
        }
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public void rollback(Xid xid) {
        this.pendingDequeueMessages.remove(xid);
        this.pendingEnqueueMessages.remove(xid);
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public Message dequeue() {
        return this.buffer.getFirstDeliverable();
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public void detach(Message message) {
        this.sharedMessageStore.detach(getName(), message);
        this.buffer.remove(message);
    }

    @Override // io.ballerina.messaging.broker.core.Queue
    public void prepareDetach(Xid xid, Message message) throws BrokerException {
        this.sharedMessageStore.detach(xid, getName(), message);
        this.pendingDequeueMessages.computeIfAbsent(xid, xid2 -> {
            return new ArrayList();
        }).add(message);
    }
}
