package com.caucho.jms.jdbc;

import com.caucho.jms.JMSExceptionWrapper;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.session.MessageConsumerImpl;
import com.caucho.jms.session.SessionImpl;
import com.caucho.log.Log;
import com.caucho.util.Alarm;
import com.caucho.util.AlarmListener;
import com.caucho.util.L10N;
import com.rc.retroweaver.runtime.ClassLiteral;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.sql.DataSource;

/* loaded from: input_file:com/caucho/jms/jdbc/JdbcTopicConsumer.class */
public class JdbcTopicConsumer extends MessageConsumerImpl implements AlarmListener, TopicSubscriber {
    static final Logger log = Log.open(ClassLiteral.getClass("com/caucho/jms/jdbc/JdbcTopicConsumer"));
    static final L10N L = new L10N(ClassLiteral.getClass("com/caucho/jms/jdbc/JdbcTopicConsumer"));
    private static final long TOPIC_TIMEOUT = 3600000;
    private JdbcManager _jdbcManager;
    private JdbcTopic _topic;
    private String _subscriber;
    private long _consumerId;
    private long _lastPurgeTime;
    private boolean _isClosed;
    private Alarm _alarm;

    public JdbcTopicConsumer(SessionImpl sessionImpl, String str, JdbcManager jdbcManager, JdbcTopic jdbcTopic, boolean z) throws JMSException {
        super(sessionImpl, str, jdbcTopic, z);
        this._jdbcManager = jdbcManager;
        this._topic = jdbcTopic;
        createTopic();
        this._alarm = new Alarm(this, 900000L);
    }

    public JdbcTopicConsumer(SessionImpl sessionImpl, String str, JdbcManager jdbcManager, JdbcTopic jdbcTopic, boolean z, String str2) throws JMSException {
        super(sessionImpl, str, jdbcTopic, z);
        this._jdbcManager = jdbcManager;
        this._topic = jdbcTopic;
        this._subscriber = str2;
        createTopic(str2);
    }

    public Topic getTopic() {
        return this._topic;
    }

    private void createTopic() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection connection = dataSource.getConnection();
            try {
                long j = -1;
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("SELECT MAX(m_id) FROM ").append(messageTable).append(" WHERE queue=?").toString());
                prepareStatement.setInt(1, this._topic.getId());
                ResultSet executeQuery = prepareStatement.executeQuery();
                if (executeQuery.next()) {
                    j = executeQuery.getLong(1);
                }
                executeQuery.close();
                PreparedStatement prepareStatement2 = connection.prepareStatement(new StringBuffer().append("INSERT INTO ").append(consumerTable).append(" (queue, expire, read, ack) VALUES (?,?,?,?)").toString(), 1);
                prepareStatement2.setInt(1, this._topic.getId());
                prepareStatement2.setLong(2, Alarm.getCurrentTime() + TOPIC_TIMEOUT);
                prepareStatement2.setLong(3, j);
                prepareStatement2.setLong(4, j);
                prepareStatement2.executeUpdate();
                ResultSet generatedKeys = prepareStatement2.getGeneratedKeys();
                if (!generatedKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = generatedKeys.getLong(1);
                generatedKeys.close();
                prepareStatement2.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void deleteTopic() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(consumerTable).append(" WHERE s_id=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void createTopic(String str) throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            String messageTable = this._jdbcManager.getMessageTable();
            String clientID = this._session.getClientID();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("SELECT s_id FROM ").append(consumerTable).append(" WHERE queue=? AND client=? AND name=?").toString());
                prepareStatement.setInt(1, this._topic.getId());
                prepareStatement.setString(2, clientID);
                prepareStatement.setString(3, str);
                ResultSet executeQuery = prepareStatement.executeQuery();
                if (executeQuery.next()) {
                    this._consumerId = executeQuery.getLong(1);
                    executeQuery.close();
                    connection.close();
                    return;
                }
                long j = -1;
                PreparedStatement prepareStatement2 = connection.prepareStatement(new StringBuffer().append("SELECT MAX(m_id) FROM ").append(messageTable).append(" WHERE queue=?").toString());
                prepareStatement2.setInt(1, this._topic.getId());
                ResultSet executeQuery2 = prepareStatement2.executeQuery();
                if (executeQuery2.next()) {
                    j = executeQuery2.getLong(1);
                }
                executeQuery2.close();
                PreparedStatement prepareStatement3 = connection.prepareStatement(new StringBuffer().append("INSERT INTO ").append(consumerTable).append(" (queue, client, name, expire, read, ack) VALUES (?,?,?,?,?,?)").toString(), 1);
                prepareStatement3.setInt(1, this._topic.getId());
                prepareStatement3.setString(2, clientID);
                prepareStatement3.setString(3, str);
                prepareStatement3.setLong(4, 4611686018427387903L);
                prepareStatement3.setLong(5, j);
                prepareStatement3.setLong(6, j);
                prepareStatement3.executeUpdate();
                ResultSet generatedKeys = prepareStatement3.getGeneratedKeys();
                if (!generatedKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = generatedKeys.getLong(1);
                generatedKeys.close();
                prepareStatement3.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    protected MessageImpl receiveImpl() throws JMSException {
        purgeExpiredConsumers();
        this._topic.purgeExpiredMessages();
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("SELECT m_id, msg_type, delivered, body, header FROM ").append(messageTable).append(" AS m,").append("      ").append(consumerTable).append(" AS s").append(" WHERE s_id=? AND m.queue=s.queue AND s.read<m_id").append("   AND ?<m.expire").append(" ORDER BY m_id").toString());
                prepareStatement.setFetchSize(1);
                PreparedStatement prepareStatement2 = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(consumerTable).append(" SET read=?").append(" WHERE s_id=?").toString());
                long j = -1;
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.setLong(2, Alarm.getCurrentTime());
                MessageImpl messageImpl = null;
                ResultSet executeQuery = prepareStatement.executeQuery();
                while (executeQuery.next()) {
                    j = executeQuery.getLong(1);
                    messageImpl = jdbcMessage.readMessage(executeQuery);
                    if (this._selector == null || this._selector.isMatch(messageImpl)) {
                        break;
                    }
                    messageImpl = null;
                }
                executeQuery.close();
                if (messageImpl == null) {
                    return null;
                }
                prepareStatement2.setLong(1, j);
                prepareStatement2.setLong(2, this._consumerId);
                prepareStatement2.executeUpdate();
                MessageImpl messageImpl2 = messageImpl;
                connection.close();
                return messageImpl2;
            } finally {
                connection.close();
            }
        } catch (IOException e) {
            throw new JMSExceptionWrapper(e);
        } catch (SQLException e2) {
            throw new JMSExceptionWrapper(e2);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void acknowledge() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(consumerTable).append(" SET ack=read ").append(" WHERE s_id=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                deleteOldMessages();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void deleteOldMessages() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(messageTable).append(" WHERE queue=? AND NOT EXISTS(").append("   SELECT * FROM ").append(consumerTable).append("   WHERE queue=? AND ack < m_id)").toString());
                prepareStatement.setInt(1, this._topic.getId());
                prepareStatement.setInt(2, this._topic.getId());
                prepareStatement.executeUpdate();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void rollback() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(consumerTable).append(" SET read=ack ").append(" WHERE s_id=?").toString());
                prepareStatement.setLong(1, this._consumerId);
                prepareStatement.executeUpdate();
                prepareStatement.close();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    private void purgeExpiredConsumers() {
        long currentTime = Alarm.getCurrentTime();
        if (currentTime < this._lastPurgeTime + TOPIC_TIMEOUT) {
            return;
        }
        this._lastPurgeTime = currentTime;
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            this._jdbcManager.getJdbcMessage();
            Connection connection = dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("DELETE FROM ").append(consumerTable).append(" WHERE is_topic=1 AND expire<?").toString());
                prepareStatement.setLong(1, Alarm.getCurrentTime());
                prepareStatement.executeUpdate();
                connection.close();
            } catch (Throwable th) {
                connection.close();
                throw th;
            }
        } catch (Exception e) {
            log.log(Level.FINER, e.toString(), (Throwable) e);
        }
    }

    @Override // com.caucho.util.AlarmListener
    public void handleAlarm(Alarm alarm) {
        try {
            if (this._isClosed) {
                return;
            }
            try {
                Connection connection = this._jdbcManager.getDataSource().getConnection();
                try {
                    PreparedStatement prepareStatement = connection.prepareStatement(new StringBuffer().append("UPDATE ").append(this._jdbcManager.getConsumerTable()).append(" SET expire=?").append(" WHERE s_id=?").toString());
                    prepareStatement.setLong(1, Alarm.getCurrentTime() + TOPIC_TIMEOUT);
                    prepareStatement.setLong(2, this._consumerId);
                    prepareStatement.executeUpdate();
                    connection.close();
                    this._alarm.queue(900000L);
                } catch (Throwable th) {
                    connection.close();
                    throw th;
                }
            } catch (Throwable th2) {
                log.log(Level.WARNING, th2.toString(), th2);
                this._alarm.queue(900000L);
            }
        } catch (Throwable th3) {
            this._alarm.queue(900000L);
            throw th3;
        }
    }

    @Override // com.caucho.jms.session.MessageConsumerImpl
    public void close() throws JMSException {
        if (this._isClosed) {
            return;
        }
        this._isClosed = true;
        if (this._alarm != null) {
            this._alarm.dequeue();
        }
        try {
            if (this._subscriber == null) {
                deleteTopic();
            } else {
                this._session.unsubscribe(this._subscriber);
            }
        } catch (Throwable th) {
            log.log(Level.WARNING, th.toString(), th);
        }
        super.close();
    }

    public String toString() {
        return new StringBuffer().append("JdbcTopicConsumer[").append(this._topic).append(",").append(this._consumerId).append("]").toString();
    }
}
