package io.ballerina.messaging.broker.core.store;

import com.lmax.disruptor.EventHandler;
import io.ballerina.messaging.broker.common.DaoException;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.store.dao.MessageDao;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ballerina/messaging/broker/core/store/DbAccessHandler.class */
public class DbAccessHandler implements EventHandler<DbOperation> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DbAccessHandler.class);
    private final MessageDao messageDao;
    private final int maxBatchSize;
    private final TransactionData transactionData = new TransactionData();
    private final List<DbOperation> transactionEvents;
    private final List<DbOperation> readEvents;

    public DbAccessHandler(MessageDao messageDao, int i) {
        this.messageDao = messageDao;
        this.maxBatchSize = i;
        this.readEvents = new ArrayList(i);
        this.transactionEvents = new ArrayList(i);
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(DbOperation dbOperation, long j, boolean z) throws Exception {
        while (!dbOperation.acquireForPersisting()) {
            LOGGER.debug("Waiting to acquire event to persist. Sequence {}", Long.valueOf(j));
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("{} event added for id {} for sequence {}", dbOperation.getType(), Long.valueOf(dbOperation.getMessageId()), Long.valueOf(j));
        }
        switch (dbOperation.getType()) {
            case INSERT_MESSAGE:
            case DELETE_MESSAGE:
            case DETACH_MSG_FROM_QUEUE:
                this.transactionEvents.add(dbOperation);
                break;
            case READ_MSG_DATA:
                this.readEvents.add(dbOperation);
                break;
            case NO_OP:
                break;
            default:
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.error("Unknown event type " + dbOperation.getType());
                    break;
                }
                break;
        }
        processTransactions(z);
        processMessageReads(z);
    }

    private void processTransactions(boolean z) {
        try {
        } catch (DaoException e) {
            this.transactionEvents.forEach(dbOperation -> {
                dbOperation.setExceptionObject(e);
            });
        } finally {
            this.transactionData.clear();
            this.transactionEvents.clear();
        }
        if (isBatchReady(z, this.transactionEvents)) {
            clusterTransactionEvents();
            this.messageDao.persist(this.transactionData);
        }
    }

    private void clusterTransactionEvents() {
        this.transactionEvents.forEach(dbOperation -> {
            switch (dbOperation.getType()) {
                case INSERT_MESSAGE:
                    this.transactionData.addEnqueueMessage(dbOperation.getMessage());
                    return;
                case DELETE_MESSAGE:
                    this.transactionData.addDeletableMessage(dbOperation.getMessageId());
                    return;
                case DETACH_MSG_FROM_QUEUE:
                    this.transactionData.detach(dbOperation.getQueueName(), dbOperation.getMessageId());
                    return;
                default:
                    LOGGER.error("Invalid transaction event collected {}", dbOperation.getType());
                    return;
            }
        });
    }

    private void processMessageReads(boolean z) {
        if (isBatchReady(z, this.readEvents)) {
            try {
                this.messageDao.read(getUniqueMessageList());
            } catch (DaoException e) {
                this.readEvents.forEach(dbOperation -> {
                    dbOperation.setExceptionObject(e);
                });
            } finally {
                this.readEvents.clear();
            }
        }
    }

    private Map<Long, List<Message>> getUniqueMessageList() {
        HashMap hashMap = new HashMap(this.maxBatchSize);
        this.readEvents.forEach(dbOperation -> {
            ((List) hashMap.computeIfAbsent(Long.valueOf(dbOperation.getBareMessage().getInternalId()), l -> {
                return new ArrayList();
            })).add(dbOperation.getBareMessage());
        });
        return hashMap;
    }

    private boolean isBatchComplete(int i, boolean z) {
        return i >= this.maxBatchSize || z;
    }

    private boolean isBatchReady(boolean z, Collection collection) {
        return !collection.isEmpty() && isBatchComplete(collection.size(), z);
    }
}
