package com.caucho.jms.jdbc;

import com.caucho.jms.JMSExceptionWrapper;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.session.MessageConsumerImpl;
import com.caucho.jms.session.SessionImpl;
import com.caucho.log.Log;
import com.caucho.util.Alarm;
import com.caucho.util.AlarmListener;
import com.caucho.util.L10N;
import com.rc.retroweaver.runtime.ClassLiteral;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.sql.DataSource;

/* loaded from: input_file:com/caucho/jms/jdbc/JdbcQueueConsumer.class */
public class JdbcQueueConsumer extends MessageConsumerImpl implements QueueReceiver, AlarmListener {
    static final Logger log = Log.open(ClassLiteral.getClass("com/caucho/jms/jdbc/JdbcQueueConsumer"));
    static final L10N L = new L10N(ClassLiteral.getClass("com/caucho/jms/jdbc/JdbcQueueConsumer"));
    private static final long QUEUE_TIMEOUT = 3600000;
    private JdbcManager _jdbcManager;
    private JdbcQueue _queue;
    private long _consumerId;
    private boolean _autoAck;
    private boolean _isClosed;
    private Alarm _alarm;
    private long _lastPurgeTime;

    public JdbcQueueConsumer(SessionImpl sessionImpl, String str, JdbcManager jdbcManager, JdbcQueue jdbcQueue) throws JMSException {
        super(sessionImpl, str, jdbcQueue, false);
        this._jdbcManager = jdbcManager;
        this._queue = jdbcQueue;
        if (sessionImpl.getAcknowledgeMode() == 1 || sessionImpl.getAcknowledgeMode() == 3) {
            this._autoAck = true;
        }
        createQueue();
        this._alarm = new Alarm(this, 900000L);
        if (log.isLoggable(Level.FINE)) {
            log.fine(new StringBuffer().append("JdbcQueueConsumer[").append(jdbcQueue).append(",").append(this._consumerId).append("] created").toString());
        }
    }

    public Queue getQueue() {
        return this._queue;
    }

    private void createQueue() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            this._jdbcManager.getMessageTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("INSERT INTO ").append(consumerTable).append(" (queue, expire) VALUES (?,?)").toString(), 1);
                prepareStatement.setInt(1, this._queue.getId());
                prepareStatement.setLong(2, Alarm.getCurrentTime() + QUEUE_TIMEOUT);
                prepareStatement.executeUpdate();
                ResultSet generatedKeys = prepareStatement.getGeneratedKeys();
                if (!generatedKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = generatedKeys.getLong(1);
                generatedKeys.close();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void deleteQueue() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(consumerTable).append(" WHERE s_id=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    protected MessageImpl receiveImpl() throws JMSException {
        MessageImpl messageImpl;
        try {
            purgeExpiredConsumers();
            this._queue.purgeExpiredMessages();
            long j = -1;
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("SELECT m_id, msg_type, delivered, body, header FROM ").append(messageTable).append(" WHERE ?<m_id AND queue=?").append("   AND consumer IS NULL AND ?<=expire").append(" ORDER BY m_id").toString());
                prepareStatement.setFetchSize(1);
                PreparedStatement prepareStatement2 = connection.prepareStatement(this._autoAck ? new StringBuffer().append("DELETE FROM ").append(messageTable).append(" WHERE m_id=? AND consumer IS NULL").toString() : new StringBuffer().append("UPDATE ").append(messageTable).append(" SET consumer=?, delivered=1").append(" WHERE m_id=? AND consumer IS NULL").toString());
                do {
                    long j2 = -1;
                    prepareStatement.setLong(1, j);
                    prepareStatement.setInt(2, this._queue.getId());
                    prepareStatement.setLong(3, Alarm.getCurrentTime());
                    messageImpl = null;
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    while (executeQuery.next()) {
                        j2 = executeQuery.getLong(1);
                        j = j2;
                        messageImpl = jdbcMessage.readMessage(executeQuery);
                        if (this._selector == null || this._selector.isMatch(messageImpl)) {
                            break;
                        }
                        messageImpl = null;
                    }
                    executeQuery.close();
                    if (messageImpl == null) {
                        return null;
                    }
                    if (this._autoAck) {
                        prepareStatement2.setLong(1, j2);
                    } else {
                        prepareStatement2.setLong(1, this._consumerId);
                        prepareStatement2.setLong(2, j2);
                    }
                } while (prepareStatement2.executeUpdate() != 1);
                MessageImpl messageImpl2 = messageImpl;
                connection.close();
                return messageImpl2;
            } finally {
                connection.close();
            }
        } catch (IOException e) {
            throw new JMSExceptionWrapper(e);
        } catch (SQLException e2) {
            throw new JMSExceptionWrapper(e2);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void acknowledge() throws JMSException {
        if (this._autoAck) {
            return;
        }
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(messageTable).append(" ").append("WHERE consumer=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void rollback() throws JMSException {
        if (this._autoAck) {
            return;
        }
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(messageTable).append(" SET consumer=NULL ").append(" WHERE consumer=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void purgeExpiredConsumers() {
        long currentTime = Alarm.getCurrentTime();
        if (currentTime < this._lastPurgeTime + QUEUE_TIMEOUT) {
            return;
        }
        this._lastPurgeTime = currentTime;
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            this._jdbcManager.getJdbcMessage();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(messageTable).append(" SET consumer=NULL").append(" WHERE consumer IS NOT NULL").append("  AND EXISTS(SELECT * FROM ").append(consumerTable).append("             WHERE s_id=consumer AND expire<?)").toString());
                prepareStatement.setLong(1, Alarm.getCurrentTime());
                int executeUpdate = prepareStatement.executeUpdate();
                prepareStatement.close();
                if (executeUpdate > 0) {
                    log.fine(new StringBuffer().append("JMSQueue[").append(this._queue.getName()).append("] recovered ").append(executeUpdate).append(" messages").toString());
                }
                PreparedStatement prepareStatement2 = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(consumerTable).append(" WHERE expire<?").toString());
                prepareStatement2.setLong(1, Alarm.getCurrentTime());
                prepareStatement2.executeUpdate();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (Exception e) {
            log.log(Level.FINER, e.toString(), (Throwable) e);
        }
    }

    @Override // com.caucho.util.AlarmListener
    public void handleAlarm(Alarm alarm) {
        try {
            if (this._isClosed) {
                return;
            }
            try {
                Connection connection = this._jdbcManager.getDataSource().getConnection();
                try {
                    PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(this._jdbcManager.getConsumerTable()).append(" SET expire=?").append(" WHERE s_id=?").toString());
                    prepareStatement.setLong(1, Alarm.getCurrentTime() + QUEUE_TIMEOUT);
                    prepareStatement.setLong(2, this._consumerId);
                    prepareStatement.executeUpdate();
                    connection.close();
                    this._alarm.queue(900000L);
                } catch (Throwable th) {
                    connection.close();
                    throw th;
                }
            } catch (Throwable th2) {
                log.log(Level.WARNING, th2.toString(), th2);
                this._alarm.queue(900000L);
            }
        } catch (Throwable th3) {
            this._alarm.queue(900000L);
            throw th3;
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void close() throws JMSException {
        if (this._isClosed) {
            return;
        }
        this._isClosed = true;
        this._alarm.dequeue();
        try {
            deleteQueue();
        } catch (Throwable th) {
            log.log(Level.WARNING, th.toString(), th);
        }
        super.close();
    }

    public String toString() {
        return new StringBuffer().append("JdbcQueueConsumer[").append(this._queue).append(",").append(this._consumerId).append("]").toString();
    }
}
