View Javadoc

1   /***
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc.adapter;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.sql.Blob;
25  import java.sql.Connection;
26  import java.sql.PreparedStatement;
27  import java.sql.ResultSet;
28  import java.sql.SQLException;
29  
30  import javax.jms.JMSException;
31  
32  import org.codehaus.activemq.store.jdbc.StatementProvider;
33  
34  
35  /***
36   * This JDBCAdapter inserts and extracts BLOB data using the 
37   * getBlob()/setBlob() operations.  This is a little more involved
38   * since to insert a blob you have to:
39   * 
40   *  1: insert empty blob.
41   *  2: select the blob 
42   *  3: finally update the blob with data value. 
43   * 
44   * The databases/JDBC drivers that use this adapter are:
45   * <ul>
46   * <li></li> 
47   * </ul>
48   * 
49   * @version $Revision: 1.1 $
50   */
51  public class BlobJDBCAdapter extends DefaultJDBCAdapter {
52  
53      public BlobJDBCAdapter() {
54          super();
55      }
56  
57      public BlobJDBCAdapter(StatementProvider provider) {
58          super(provider);
59      }
60      
61      public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException,
62              JMSException {
63          PreparedStatement s = null;
64          ResultSet rs = null;
65          try {
66              
67              // Add the Blob record.
68              s = c.prepareStatement(statementProvider.getAddMessageStatment());
69              s.setLong(1, seq);
70              s.setString(2, destinationName);
71              s.setString(3, messageID);
72              s.setString(4, " ");
73              
74              if (s.executeUpdate() != 1)
75                  throw new JMSException("Failed to broker message: " + messageID
76                          + " in container.");
77              s.close();
78  
79              // Select the blob record so that we can update it.
80              s = c.prepareStatement(statementProvider.getFindMessageStatment());
81              s.setLong(1, seq);
82              rs = s.executeQuery();
83              if (!rs.next())
84                  throw new JMSException("Failed to broker message: " + messageID
85                          + " in container.");
86  
87              // Update the blob
88              Blob blob = rs.getBlob(1);
89              OutputStream stream = blob.setBinaryStream(data.length);
90              stream.write(data);
91              stream.close();
92              s.close();
93  
94              // Update the row with the updated blob
95              s = c.prepareStatement(statementProvider.getUpdateMessageStatment());
96              s.setBlob(1, blob);
97              s.setLong(2, seq);
98  
99          } catch (IOException e) {
100             throw (SQLException) new SQLException("BLOB could not be updated: "
101                     + e).initCause(e);
102         } finally {
103             try {
104                 rs.close();
105             } catch (Throwable e) {
106             }
107             try {
108                 s.close();
109             } catch (Throwable e) {
110             }
111         }
112     }
113     
114     public byte[] doGetMessage(Connection c, long seq) throws SQLException {
115 	    PreparedStatement s=null; ResultSet rs=null;
116 	    try {
117 	        
118 	        s = c.prepareStatement(statementProvider.getFindMessageStatment());
119 	        s.setLong(1, seq); 
120 	        rs = s.executeQuery();
121 	        
122 	        if( !rs.next() )
123 	            return null;
124 	        Blob blob = rs.getBlob(1);
125 	        InputStream is = blob.getBinaryStream();
126 	        
127 	        ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());	        
128 	        int ch;
129 	        while( (ch=is.read())>= 0 ) {
130 	            os.write(ch);
131 	        }
132 	        is.close();
133 	        os.close();
134 	        
135 	        return os.toByteArray();
136 	        
137 	    } catch (IOException e) {
138             throw (SQLException) new SQLException("BLOB could not be updated: "
139                     + e).initCause(e);
140         } finally {
141 	        try { rs.close(); } catch (Throwable e) {}
142 	        try { s.close(); } catch (Throwable e) {}
143 	    }
144     }
145 
146 }