package org.apache.synapse.message.store.impl.jdbc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.MessageProducer;
import org.apache.synapse.message.store.AbstractMessageStore;
import org.apache.synapse.message.store.impl.commons.MessageConverter;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
import org.apache.synapse.message.store.impl.jdbc.util.JDBCConfiguration;
import org.apache.synapse.message.store.impl.jdbc.util.Statement;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.7-wso2v228.jar:org/apache/synapse/message/store/impl/jdbc/JDBCMessageStore.class */
public class JDBCMessageStore extends AbstractMessageStore {
    private JDBCConfiguration jdbcConfiguration;
    private static final Log logger = LogFactory.getLog(JDBCMessageStore.class.getName());
    private final ReentrantLock removeLock = new ReentrantLock();
    private final ReentrantLock cleanUpOfferLock = new ReentrantLock();
    private final AtomicBoolean cleaningFlag = new AtomicBoolean(false);
    protected static final String MESSAGE_COLUMN_NAME = "message";

    @Override // org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing JDBC Message Store");
        }
        super.init(synapseEnvironment);
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing Datasource and Properties");
        }
        this.jdbcConfiguration = new JDBCConfiguration();
        this.jdbcConfiguration.buildDataSource(this.parameters);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JDBCConfiguration getJdbcConfiguration() {
        return this.jdbcConfiguration;
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageProducer getProducer() {
        JDBCProducer jDBCProducer = new JDBCProducer(this);
        jDBCProducer.setId(nextProducerId());
        if (logger.isDebugEnabled()) {
            logger.debug(getNameString() + " created a new JDBC Message Producer.");
        }
        return jDBCProducer;
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageConsumer getConsumer() {
        JDBCConsumer jDBCConsumer = new JDBCConsumer(this);
        jDBCConsumer.setId(nextConsumerId());
        if (logger.isDebugEnabled()) {
            logger.debug(getNameString() + " created a new JDBC Message Consumer.");
        }
        return jDBCConsumer;
    }

    @Override // org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.message.store.MessageStore
    public void setParameters(Map<String, Object> map) {
        super.setParameters(map);
        if (this.jdbcConfiguration != null) {
            this.jdbcConfiguration.buildDataSource(map);
        }
    }

    private MessageContext getResultMessageContextFromDatabase(Statement statement) throws SynapseException {
        List<Map> processedRows = getProcessedRows(statement);
        MessageContext messageContext = null;
        if (processedRows.size() > 0) {
            messageContext = (MessageContext) processedRows.get(0).get("message");
            if (logger.isDebugEnabled()) {
                logger.debug("Number of rows processed:" + processedRows + " calling the statement " + statement.getStatement());
                logger.debug("Message content with mid:" + messageContext.getMessageID() + " will be returned");
            }
        }
        return messageContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Map> getProcessedRows(Statement statement) {
        Connection connection = null;
        ResultSet resultSet = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = this.jdbcConfiguration.getConnection();
                preparedStatement = connection.prepareStatement(statement.getStatement());
                int i = 1;
                for (Object obj : statement.getParameters()) {
                    if (obj instanceof String) {
                        preparedStatement.setString(i, (String) obj);
                    } else if (obj instanceof Long) {
                        preparedStatement.setLong(i, ((Long) obj).longValue());
                    } else if (obj instanceof Integer) {
                        preparedStatement.setInt(i, ((Integer) obj).intValue());
                    }
                    i++;
                }
                resultSet = preparedStatement.executeQuery();
                List<Map> result = statement.getResult(resultSet);
                close(connection, preparedStatement, resultSet);
                return result;
            } catch (SQLException e) {
                throw new SynapseException("Processing Statement failed : " + statement.getStatement() + " against DataSource : " + this.jdbcConfiguration.getDSName(), e);
            }
        } catch (Throwable th) {
            close(connection, preparedStatement, resultSet);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageContext deserializeMessage(byte[] bArr) {
        MessageContext messageContext = null;
        if (bArr == null) {
            throw new SynapseException("Retrieved Object is null");
        }
        ObjectInputStream objectInputStream = null;
        try {
            try {
                objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bArr));
                Object readObject = objectInputStream.readObject();
                if (readObject instanceof StorableMessage) {
                    StorableMessage storableMessage = (StorableMessage) readObject;
                    org.apache.axis2.context.MessageContext newAxis2Mc = newAxis2Mc();
                    messageContext = MessageConverter.toMessageContext(storableMessage, newAxis2Mc, newSynapseMc(newAxis2Mc));
                }
                if (objectInputStream != null) {
                    closeStream(objectInputStream);
                }
                return messageContext;
            } catch (IOException e) {
                throw new SynapseException("Error reading object input stream", e);
            } catch (ClassNotFoundException e2) {
                throw new SynapseException("Could not find the class", e2);
            }
        } catch (Throwable th) {
            if (objectInputStream != null) {
                closeStream(objectInputStream);
            }
            throw th;
        }
    }

    private void closeStream(ObjectInputStream objectInputStream) {
        try {
            objectInputStream.close();
        } catch (IOException e) {
            logger.error("Error while closing object input stream", e);
        }
    }

    private org.apache.axis2.context.MessageContext newAxis2Mc() {
        return ((Axis2SynapseEnvironment) this.synapseEnvironment).getAxis2ConfigurationContext().createMessageContext();
    }

    private MessageContext newSynapseMc(org.apache.axis2.context.MessageContext messageContext) {
        return new Axis2MessageContext(messageContext, this.synapseEnvironment.getSynapseConfiguration(), this.synapseEnvironment);
    }

    private void rollback(Connection connection, String str) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                logger.warn("Rollback failed on " + str, e);
            }
        }
    }

    private boolean processNonResultingStatement(List<Statement> list) throws SynapseException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = this.jdbcConfiguration.getConnection();
                connection.setAutoCommit(false);
                for (Statement statement : list) {
                    preparedStatement = connection.prepareStatement(statement.getStatement());
                    int i = 1;
                    for (Object obj : statement.getParameters()) {
                        if (obj instanceof String) {
                            preparedStatement.setString(i, (String) obj);
                        } else if (obj instanceof Long) {
                            preparedStatement.setLong(i, ((Long) obj).longValue());
                        } else if (obj instanceof StorableMessage) {
                            preparedStatement.setBytes(i, serialize(obj));
                        }
                        i++;
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Executing statement:" + preparedStatement);
                    }
                    preparedStatement.execute();
                }
                connection.commit();
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        logger.error("Error while closing prepared statement", e);
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e2) {
                        logger.error("Error while closing connection", e2);
                    }
                }
                return true;
            } catch (IOException | SQLException e3) {
                rollback(connection, "deleting message");
                throw new SynapseException("Processing Statement failed against DataSource : " + this.jdbcConfiguration.getDSName(), e3);
            }
        } catch (Throwable th) {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e4) {
                    logger.error("Error while closing prepared statement", e4);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e5) {
                    logger.error("Error while closing connection", e5);
                }
            }
            throw th;
        }
    }

    public byte[] serialize(Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(obj);
        return byteArrayOutputStream.toByteArray();
    }

    @Override // org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.ManagedLifecycle
    public void destroy() {
        super.destroy();
        this.jdbcConfiguration = null;
    }

    public boolean store(MessageContext messageContext) throws SynapseException {
        if (messageContext == null) {
            logger.error("Message is null, can't store into database");
            return false;
        }
        boolean z = false;
        try {
            try {
                if (this.cleaningFlag.get()) {
                    try {
                        this.cleanUpOfferLock.lock();
                        z = true;
                    } catch (Exception e) {
                        logger.error("Message Cleanup lock released unexpectedly", e);
                    }
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(getStoreMessageStatement(messageContext, null));
                boolean processNonResultingStatement = processNonResultingStatement(arrayList);
                if (z) {
                    this.cleanUpOfferLock.unlock();
                }
                return processNonResultingStatement;
            } catch (Exception e2) {
                throw new SynapseException("Error while creating StorableMessage", e2);
            }
        } catch (Throwable th) {
            if (z) {
                this.cleanUpOfferLock.unlock();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Statement getStoreMessageStatement(MessageContext messageContext, Long l) throws StoreException {
        Statement statement;
        StorableMessage storableMessage = MessageConverter.toStorableMessage(messageContext);
        String messageID = storableMessage.getAxis2message().getMessageID();
        if (null == l) {
            statement = new Statement("INSERT INTO " + this.jdbcConfiguration.getTableName() + " (msg_id,message) VALUES (?,?)") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.1
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) {
                    throw new UnsupportedOperationException();
                }
            };
            statement.addParameter(messageID);
            statement.addParameter(storableMessage);
        } else {
            statement = new Statement("INSERT INTO " + this.jdbcConfiguration.getTableName() + " (msg_id,seq_id,message) VALUES (?,?,?)") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.2
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) {
                    throw new UnsupportedOperationException();
                }
            };
            statement.addParameter(messageID);
            statement.addParameter(l);
            statement.addParameter(storableMessage);
        }
        return statement;
    }

    public MessageContext peek() throws SynapseException {
        try {
            return getResultMessageContextFromDatabase(new Statement("SELECT message FROM " + this.jdbcConfiguration.getTableName() + " WHERE indexId=(SELECT min(indexId) from " + this.jdbcConfiguration.getTableName() + ")") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.3
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) throws SQLException {
                    return JDBCMessageStore.this.messageContentResultSet(resultSet, getStatement());
                }
            });
        } catch (SynapseException e) {
            throw new SynapseException("Error while peek the message", e);
        }
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageContext remove() throws NoSuchElementException {
        MessageContext remove = remove(peek().getMessageID());
        if (remove != null) {
            return remove;
        }
        throw new NoSuchElementException("First element not found and remove failed !");
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageContext remove(String str) throws SynapseException {
        boolean z = false;
        try {
            try {
                if (this.cleaningFlag.get()) {
                    try {
                        this.removeLock.lock();
                        z = true;
                    } catch (Exception e) {
                        logger.error("Message Cleanup lock released unexpectedly", e);
                    }
                }
                MessageContext messageContext = get(str);
                processNonResultingStatement(removeMessageStatement(str));
                if (z) {
                    this.removeLock.unlock();
                }
                return messageContext;
            } catch (Exception e2) {
                throw new SynapseException("Removing message with id = " + str + " failed !", e2);
            }
        } catch (Throwable th) {
            if (z) {
                this.removeLock.unlock();
            }
            throw th;
        }
    }

    protected List<Statement> removeMessageStatement(String str) {
        ArrayList arrayList = new ArrayList();
        Statement statement = new Statement("DELETE FROM " + this.jdbcConfiguration.getTableName() + " WHERE msg_id=?") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.4
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                throw new UnsupportedOperationException();
            }
        };
        statement.addParameter(str);
        arrayList.add(statement);
        return arrayList;
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public void clear() {
        try {
            logger.warn(getNameString() + "deleting all entries");
            this.removeLock.lock();
            this.cleanUpOfferLock.lock();
            this.cleaningFlag.set(true);
            Statement statement = new Statement("DELETE FROM " + this.jdbcConfiguration.getTableName()) { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.5
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) throws SQLException {
                    throw new UnsupportedOperationException();
                }
            };
            ArrayList arrayList = new ArrayList();
            arrayList.add(statement);
            processNonResultingStatement(arrayList);
        } catch (Exception e) {
            logger.error("Clearing store failed !", e);
        } finally {
            this.cleaningFlag.set(false);
            this.removeLock.unlock();
            this.cleanUpOfferLock.unlock();
        }
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageContext get(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Index:" + i + " out of table bound");
        }
        Statement statement = new Statement("SELECT message FROM " + this.jdbcConfiguration.getTableName() + " ORDER BY indexId ASC LIMIT ?,1 ") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.6
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                return JDBCMessageStore.this.messageContentResultSet(resultSet, getStatement());
            }
        };
        statement.addParameter(Integer.valueOf(i));
        return getResultMessageContextFromDatabase(statement);
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public List<MessageContext> getAll() {
        if (logger.isDebugEnabled()) {
            logger.debug(getNameString() + " retrieving all messages from the store.");
        }
        MessageContext resultMessageContextFromDatabase = getResultMessageContextFromDatabase(new Statement("SELECT message FROM " + this.jdbcConfiguration.getTableName()) { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.7
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                return JDBCMessageStore.this.messageContentResultSet(resultSet, getStatement());
            }
        });
        if (resultMessageContextFromDatabase == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(resultMessageContextFromDatabase);
        return arrayList;
    }

    @Override // org.apache.synapse.message.store.MessageStore
    public MessageContext get(String str) {
        Statement statement = new Statement("SELECT indexId,message FROM " + this.jdbcConfiguration.getTableName() + " WHERE msg_id=?") { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.8
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                return JDBCMessageStore.this.messageContentResultSet(resultSet, getStatement());
            }
        };
        statement.addParameter(str);
        return getResultMessageContextFromDatabase(statement);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Map> messageContentResultSet(ResultSet resultSet, String str) throws SQLException {
        ArrayList arrayList = new ArrayList();
        while (resultSet.next()) {
            try {
                HashMap hashMap = new HashMap();
                hashMap.put("message", deserializeMessage(resultSet.getBytes("message")));
                arrayList.add(hashMap);
            } catch (SQLException e) {
                throw new SynapseException("Error executing statement : " + str + " against DataSource : " + this.jdbcConfiguration.getDSName(), e);
            }
        }
        return arrayList;
    }

    @Override // org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.message.store.MessageStore
    public int size() {
        Connection connection = null;
        ResultSet resultSet = null;
        PreparedStatement preparedStatement = null;
        int i = 0;
        Statement statement = new Statement("SELECT COUNT(*) FROM " + this.jdbcConfiguration.getTableName()) { // from class: org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore.9
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet2) throws SQLException {
                return JDBCMessageStore.this.messageContentResultSet(resultSet2, getStatement());
            }
        };
        try {
            try {
                connection = this.jdbcConfiguration.getConnection();
                preparedStatement = connection.prepareStatement(statement.getStatement());
                resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    try {
                        i = resultSet.getInt(1);
                    } catch (Exception e) {
                        logger.error("Error executing statement : " + statement.getStatement() + " against DataSource : " + this.jdbcConfiguration.getDSName(), e);
                    }
                }
                close(connection, preparedStatement, resultSet);
            } catch (SQLException e2) {
                logger.error("Error executing statement : " + statement.getStatement() + " against DataSource : " + this.jdbcConfiguration.getDSName(), e2);
                close(connection, preparedStatement, resultSet);
            }
            return i;
        } catch (Throwable th) {
            close(connection, preparedStatement, resultSet);
            throw th;
        }
    }

    private String getNameString() {
        return "Store [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX;
    }

    private void close(Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) {
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                logger.error("Error while closing prepared statement", e);
            }
        }
        if (resultSet != null) {
            try {
                resultSet.close();
            } catch (SQLException e2) {
                logger.error("Error while closing result set", e2);
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e3) {
                logger.error("Error while closing connection", e3);
            }
        }
    }
}
