package io.ballerina.messaging.broker.core.store.dao.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.ballerina.messaging.broker.common.data.types.FieldTable;
import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.ContentChunk;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.Metadata;
import io.ballerina.messaging.broker.core.metrics.BrokerMetricManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.sql.DataSource;
import org.wso2.carbon.metrics.core.Timer;

/* loaded from: input_file:io/ballerina/messaging/broker/core/store/dao/impl/MessageCrudOperationsDao.class */
class MessageCrudOperationsDao extends BaseDao {
    private final BrokerMetricManager metricManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageCrudOperationsDao(DataSource dataSource, BrokerMetricManager brokerMetricManager) {
        super(dataSource);
        this.metricManager = brokerMetricManager;
    }

    @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"}, justification = "Return value of context.stop() is not required.")
    public void persist(Connection connection, Collection<Message> collection) throws SQLException {
        PreparedStatement preparedStatement = null;
        PreparedStatement preparedStatement2 = null;
        PreparedStatement preparedStatement3 = null;
        Timer.Context startMessageWriteTimer = this.metricManager.startMessageWriteTimer();
        try {
            try {
                preparedStatement = connection.prepareStatement("INSERT INTO MB_METADATA (MESSAGE_ID, EXCHANGE_NAME, ROUTING_KEY, CONTENT_LENGTH, MESSAGE_METADATA) VALUES(?, ?, ?, ?, ?)");
                preparedStatement2 = connection.prepareStatement("INSERT INTO MB_CONTENT (MESSAGE_ID, CONTENT_OFFSET, MESSAGE_CONTENT) VALUES(?, ?, ?)");
                preparedStatement3 = connection.prepareStatement(RDBMSConstants.PS_INSERT_INTO_QUEUE);
                for (Message message : collection) {
                    prepareMetadata(preparedStatement, message);
                    prepareContent(preparedStatement2, message);
                    prepareQueueAttachments(preparedStatement3, message);
                }
                preparedStatement.executeBatch();
                preparedStatement2.executeBatch();
                preparedStatement3.executeBatch();
                startMessageWriteTimer.stop();
                close(preparedStatement);
                close(preparedStatement2);
                close(preparedStatement3);
            } catch (SQLException e) {
                throw new SQLException("Error persisting messages.", e);
            }
        } catch (Throwable th) {
            startMessageWriteTimer.stop();
            close(preparedStatement);
            close(preparedStatement2);
            close(preparedStatement3);
            throw th;
        }
    }

    private void prepareQueueAttachments(PreparedStatement preparedStatement, Message message) throws SQLException {
        long internalId = message.getInternalId();
        for (String str : message.getAttachedQueues()) {
            preparedStatement.setLong(1, internalId);
            preparedStatement.setString(2, str);
            preparedStatement.addBatch();
        }
    }

    private void prepareContent(PreparedStatement preparedStatement, Message message) throws SQLException {
        for (ContentChunk contentChunk : message.getContentChunks()) {
            preparedStatement.setLong(1, message.getInternalId());
            preparedStatement.setLong(2, contentChunk.getOffset());
            byte[] bArr = new byte[contentChunk.getBytes().readableBytes()];
            contentChunk.getBytes().getBytes(0, bArr);
            preparedStatement.setBytes(3, bArr);
            preparedStatement.addBatch();
        }
    }

    private void prepareMetadata(PreparedStatement preparedStatement, Message message) throws SQLException {
        Metadata metadata = message.getMetadata();
        preparedStatement.setLong(1, message.getInternalId());
        preparedStatement.setString(2, metadata.getExchangeName());
        preparedStatement.setString(3, metadata.getRoutingKey());
        preparedStatement.setLong(4, metadata.getContentLength());
        byte[] bArr = new byte[(int) (metadata.getProperties().getSize() + metadata.getHeaders().getSize())];
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr);
        try {
            wrappedBuffer.resetWriterIndex();
            metadata.getProperties().write(wrappedBuffer);
            metadata.getHeaders().write(wrappedBuffer);
            preparedStatement.setBytes(5, bArr);
            preparedStatement.addBatch();
            wrappedBuffer.release();
        } catch (Throwable th) {
            wrappedBuffer.release();
            throw th;
        }
    }

    public void detachFromQueue(Connection connection, Map<String, List<Long>> map) throws BrokerException {
        PreparedStatement preparedStatement = null;
        try {
            try {
                preparedStatement = connection.prepareStatement(RDBMSConstants.PS_DELETE_FROM_QUEUE);
                for (Map.Entry<String, List<Long>> entry : map.entrySet()) {
                    String key = entry.getKey();
                    Iterator<Long> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        preparedStatement.setLong(1, it.next().longValue());
                        preparedStatement.setString(2, key);
                        preparedStatement.addBatch();
                    }
                }
                preparedStatement.executeBatch();
                close(preparedStatement);
            } catch (SQLException e) {
                throw new BrokerException("Error detaching messages from queues.", e);
            }
        } catch (Throwable th) {
            close(preparedStatement);
            throw th;
        }
    }

    @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"}, justification = "Return value of context.stop() is not required.")
    public void delete(Connection connection, Collection<Long> collection) throws BrokerException {
        PreparedStatement preparedStatement = null;
        Timer.Context startMessageDeleteTimer = this.metricManager.startMessageDeleteTimer();
        try {
            try {
                preparedStatement = connection.prepareStatement(RDBMSConstants.PS_DELETE_MESSAGE);
                Iterator<Long> it = collection.iterator();
                while (it.hasNext()) {
                    preparedStatement.setLong(1, it.next().longValue());
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                startMessageDeleteTimer.stop();
                close(preparedStatement);
            } catch (SQLException e) {
                throw new BrokerException("Error occurred while deleting messages", e);
            }
        } catch (Throwable th) {
            startMessageDeleteTimer.stop();
            close(preparedStatement);
            throw th;
        }
    }

    public Collection<Message> readAll(Connection connection, String str) throws BrokerException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            try {
                preparedStatement = connection.prepareStatement(RDBMSConstants.PS_SELECT_MESSAGES_FOR_QUEUE);
                preparedStatement.setString(1, str);
                resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    ((Message) linkedHashMap.computeIfAbsent(Long.valueOf(resultSet.getLong(1)), l -> {
                        return new Message(l.longValue(), null);
                    })).addOwnedQueue(resultSet.getString(2));
                }
                Collection<Message> values = linkedHashMap.values();
                close(resultSet);
                close(preparedStatement);
                return values;
            } catch (SQLException e) {
                throw new BrokerException("Error occurred while reading messages", e);
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            throw th;
        }
    }

    public Collection<Message> read(Connection connection, Map<Long, Message> map) throws BrokerException {
        try {
            Timer.Context startMessageReadTimer = this.metricManager.startMessageReadTimer();
            Throwable th = null;
            try {
                if (!map.isEmpty()) {
                    String sQLFormattedIdList = getSQLFormattedIdList(map.size());
                    populateMessageWithMetadata(connection, sQLFormattedIdList, map.keySet(), map);
                    populateContent(connection, sQLFormattedIdList, map);
                }
                Collection<Message> values = map.values();
                if (startMessageReadTimer != null) {
                    if (0 != 0) {
                        try {
                            startMessageReadTimer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        startMessageReadTimer.close();
                    }
                }
                return values;
            } finally {
            }
        } catch (SQLException e) {
            throw new BrokerException("Error occurred while reading messages", e);
        }
    }

    private String getSQLFormattedIdList(int i) {
        StringBuilder sb = new StringBuilder();
        sb.append("?");
        for (int i2 = 1; i2 < i; i2++) {
            sb.append(",?");
        }
        return sb.toString();
    }

    /* JADX WARN: Finally extract failed */
    @SuppressFBWarnings({"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"})
    private void populateMessageWithMetadata(Connection connection, String str, Collection<Long> collection, Map<Long, Message> map) throws SQLException, BrokerException {
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            preparedStatement = connection.prepareStatement("SELECT MESSAGE_ID, EXCHANGE_NAME, ROUTING_KEY, CONTENT_LENGTH, MESSAGE_METADATA  FROM MB_METADATA WHERE MESSAGE_ID IN (" + str + ") ORDER BY MESSAGE_ID");
            int i = 0;
            Iterator<Long> it = collection.iterator();
            while (it.hasNext()) {
                i++;
                preparedStatement.setLong(i, it.next().longValue());
            }
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                long j = resultSet.getLong(1);
                String string = resultSet.getString(2);
                String string2 = resultSet.getString(3);
                long j2 = resultSet.getLong(4);
                ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(resultSet.getBytes(5));
                try {
                    try {
                        Metadata metadata = new Metadata(string2, string, j2);
                        metadata.setProperties(FieldTable.parse(wrappedBuffer));
                        metadata.setHeaders(FieldTable.parse(wrappedBuffer));
                        Message message = map.get(Long.valueOf(j));
                        if (Objects.nonNull(message)) {
                            message.setMetadata(metadata);
                        } else {
                            map.put(Long.valueOf(j), new Message(j, metadata));
                        }
                        wrappedBuffer.release();
                    } catch (Throwable th) {
                        wrappedBuffer.release();
                        throw th;
                    }
                } catch (Exception e) {
                    throw new BrokerException("Error occurred while parsing metadata properties", e);
                }
            }
            close(resultSet);
            close(preparedStatement);
        } catch (Throwable th2) {
            close(resultSet);
            close(preparedStatement);
            throw th2;
        }
    }

    @SuppressFBWarnings({"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"})
    private void populateContent(Connection connection, String str, Map<Long, Message> map) throws SQLException {
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            preparedStatement = connection.prepareStatement("SELECT MESSAGE_ID, CONTENT_OFFSET, MESSAGE_CONTENT FROM MB_CONTENT WHERE MESSAGE_ID IN(" + str + ")");
            int i = 0;
            Iterator<Long> it = map.keySet().iterator();
            while (it.hasNext()) {
                i++;
                preparedStatement.setLong(i, it.next().longValue());
            }
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                long j = resultSet.getLong(1);
                int i2 = resultSet.getInt(2);
                byte[] bytes = resultSet.getBytes(3);
                Message message = map.get(Long.valueOf(j));
                if (message != null) {
                    message.addChunk(new ContentChunk(i2, Unpooled.copiedBuffer(bytes)));
                }
            }
            close(resultSet);
            close(preparedStatement);
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            throw th;
        }
    }
}
