package io.ballerina.messaging.broker.core.store;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.ballerina.messaging.broker.common.DaoException;
import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.queue.QueueBuffer;
import io.ballerina.messaging.broker.core.store.dao.MessageDao;
import io.ballerina.messaging.broker.core.store.disruptor.SleepingBlockingWaitStrategy;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import javax.transaction.xa.Xid;

@ThreadSafe
/* loaded from: input_file:io/ballerina/messaging/broker/core/store/DbMessageStore.class */
public class DbMessageStore extends MessageStore {
    private final Disruptor<DbOperation> disruptor;
    private static final EventTranslatorOneArg<DbOperation, Message> INSERT_MESSAGE = (dbOperation, j, message) -> {
        dbOperation.insertMessage(message);
    };
    private static final EventTranslatorTwoArg<DbOperation, String, Long> DETACH_FROM_QUEUE = (dbOperation, j, str, l) -> {
        dbOperation.detachFromQueue(str, l);
    };
    private static final EventTranslatorOneArg<DbOperation, Long> DELETE_MESSAGE = (dbOperation, j, l) -> {
        dbOperation.deleteMessage(l.longValue());
    };
    private static final EventTranslatorTwoArg<DbOperation, QueueBuffer, Message> READ_MESSAGE_DATA = (dbOperation, j, queueBuffer, message) -> {
        dbOperation.readMessageData(queueBuffer, message);
    };
    private final MessageDao messageDao;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DbMessageStore(MessageDao messageDao, int i, int i2) {
        this.disruptor = new Disruptor<>(DbOperation.getFactory(), i, new ThreadFactoryBuilder().setNameFormat("DisruptorMessageStoreThread-%d").build(), ProducerType.MULTI, new SleepingBlockingWaitStrategy());
        this.disruptor.setDefaultExceptionHandler(new DbStoreExceptionHandler());
        this.disruptor.handleEventsWith(new DbEventMatcher(i)).then(new DbAccessHandler(messageDao, i2)).then(new FinalEventHandler());
        this.disruptor.start();
        this.messageDao = messageDao;
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    void publishMessageToStore(Message message) {
        this.disruptor.publishEvent(INSERT_MESSAGE, message);
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    void detachFromQueue(String str, long j) {
        this.disruptor.publishEvent(DETACH_FROM_QUEUE, str, Long.valueOf(j));
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    void deleteMessage(long j) {
        this.disruptor.publishEvent(DELETE_MESSAGE, Long.valueOf(j));
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    void commit(TransactionData transactionData) throws BrokerException {
        try {
            this.messageDao.persist(transactionData);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    void commit(Xid xid, TransactionData transactionData) throws BrokerException {
        try {
            this.messageDao.commitPreparedData(xid, transactionData);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public void rollback(Xid xid) throws BrokerException {
        try {
            this.messageDao.rollbackPreparedData(xid);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public void fillMessageData(QueueBuffer queueBuffer, Message message) {
        this.disruptor.publishEvent(READ_MESSAGE_DATA, queueBuffer, message);
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public Collection<Message> readAllMessagesForQueue(String str) throws BrokerException {
        try {
            return this.messageDao.readAll(str);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public void prepare(Xid xid, TransactionData transactionData) throws BrokerException {
        try {
            this.messageDao.prepare(xid, transactionData);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public void retrieveStoredXids(Consumer<Xid> consumer) throws BrokerException {
        try {
            this.messageDao.retrieveAllStoredXids(consumer);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }

    @Override // io.ballerina.messaging.broker.core.store.MessageStore
    public Collection<Message> recoverEnqueuedMessages(Xid xid) throws BrokerException {
        try {
            return this.messageDao.retrieveAllEnqueuedMessages(xid);
        } catch (DaoException e) {
            throw new BrokerException(e.getMessage(), e);
        }
    }
}
