View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.persistence;
44  
45  import java.io.ByteArrayInputStream;
46  import java.io.ByteArrayOutputStream;
47  import java.io.ObjectInputStream;
48  import java.io.ObjectOutputStream;
49  import java.sql.Connection;
50  import java.sql.PreparedStatement;
51  import java.sql.ResultSet;
52  import java.sql.SQLException;
53  import java.util.HashMap;
54  import java.util.Vector;
55  
56  import javax.jms.JMSException;
57  
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  
61  import org.exolab.jms.client.JmsDestination;
62  import org.exolab.jms.client.JmsTopic;
63  import org.exolab.jms.message.MessageImpl;
64  import org.exolab.jms.messagemgr.PersistentMessageHandle;
65  
66  
67  /***
68   * This class manages the persistence of message objects.
69   *
70   * @version     $Revision: 1.4 $ $Date: 2005/08/31 05:45:50 $
71   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
72   */
73  class Messages {
74  
75      /***
76       * The destination manager.
77       */
78      private final Destinations _destinations;
79  
80      /***
81       * The logger.
82       */
83      private static final Log _log = LogFactory.getLog(Messages.class);
84  
85  
86      /***
87       * Construct a new <code>Messages</code>.
88       *
89       * @param destinations the destinations manager
90       */
91      public Messages(Destinations destinations) {
92          _destinations = destinations;
93      }
94  
95      /***
96       * Add a message to the database, in the context of the specified
97       * transaction and connection.
98       *
99       * @param connection - execute on this connection
100      * @param message - the message to add
101      * @throws PersistenceException - an sql related error
102      */
103     public void add(Connection connection, MessageImpl message)
104         throws PersistenceException {
105 
106         PreparedStatement insert = null;
107 
108         // extract the identity of the message
109         String messageId = message.getMessageId().getId();
110 
111         // check that the destination is actually registered
112         // and map the name to the corresponding id
113         String name;
114         try {
115             name = ((JmsDestination) message.getJMSDestination()).getName();
116         } catch (JMSException exception) {
117             throw new PersistenceException(
118                 "Failed to get destination for message=" +
119                 message.getMessageId(), exception);
120         }
121 
122         long destinationId = _destinations.getId(name);
123         if (destinationId == 0) {
124             throw new PersistenceException(
125                 "Cannot add message=" + message.getMessageId() +
126                 ", destination=" + name + " (" + destinationId +
127                 "): destination does not exist");
128         }
129 
130         try {
131             // create, populate and execute the insert
132             insert = connection.prepareStatement(
133                 "insert into messages (messageid, destinationid, priority, "
134                 + "createtime, expirytime, processed, messageblob) values "
135                 + "(?,?,?,?,?,?,?)");
136             insert.setString(1, messageId);
137             insert.setLong(2, destinationId);
138             insert.setInt(3, message.getJMSPriority());
139             insert.setLong(4, message.getAcceptedTime());
140             insert.setLong(5, message.getJMSExpiration());
141             insert.setInt(6, (message.getProcessed()) ? 1 : 0);
142 
143             // serialize the message
144             byte[] bytes = serialize(message);
145             insert.setBinaryStream(7, new ByteArrayInputStream(bytes),
146                 bytes.length);
147             //insert.setBytes(8, bytes);
148 
149             // execute the insert
150             if (insert.executeUpdate() != 1) {
151                 throw new PersistenceException(
152                     "Failed to add message=" + message.getMessageId() +
153                     ", destination=" + name + " (" + destinationId + ")");
154             }
155         } catch (PersistenceException exception) {
156             throw exception;
157         } catch (Exception exception) {
158             throw new PersistenceException(
159                 "Failed to add message=" + message.getMessageId() +
160                 ", destination=" + name + " (" + destinationId + ")",
161                 exception);
162         } finally {
163             SQLHelper.close(insert);
164         }
165     }
166 
167     /***
168      * Update the message state in the database. This will be called to set
169      * the message state to processed by the provider
170      *
171      * @param connection - execute on this connection
172      * @param message - the message to update
173      * @throws PersistenceException - an sql related error
174      */
175     public void update(Connection connection, MessageImpl message)
176         throws PersistenceException {
177 
178         PreparedStatement update = null;
179 
180         // extract the identity of the message
181         String messageId = message.getMessageId().getId();
182 
183         try {
184             update = connection.prepareStatement(
185                 "update messages set processed=? where messageId=?");
186             update.setInt(1, message.getProcessed() ? 1 : 0);
187             update.setString(2, messageId);
188 
189             // execute the update
190             if (update.executeUpdate() != 1) {
191                 _log.error("Cannot update message=" + messageId);
192             }
193         } catch (SQLException exception) {
194             throw new PersistenceException(
195                 "Failed to update message, id=" + messageId, exception);
196         } finally {
197             SQLHelper.close(update);
198         }
199     }
200 
201     /***
202      * Remove a message with the specified identity from the database
203      *
204      * @param connection - execute on this connection
205      * @param messageId - the message id of the message to remove
206      * @throws PersistenceException - an sql related error
207      */
208     public void remove(Connection connection, String messageId)
209         throws PersistenceException {
210 
211         PreparedStatement delete = null;
212         try {
213             delete = connection.prepareStatement(
214                 "delete from messages where messageId=?");
215             delete.setString(1, messageId);
216 
217             // execute the delete
218             if (delete.executeUpdate() != 1) {
219                 _log.error("Cannot remove message=" + messageId);
220             }
221         } catch (SQLException exception) {
222             throw new PersistenceException(
223                 "Failed to remove message, id=" + messageId, exception);
224         } finally {
225             SQLHelper.close(delete);
226         }
227     }
228 
229     /***
230      * Return the message identified by the message Id
231      *
232      * @param connection - execute on this connection
233      * @param messageId - id of message to retrieve
234      * @return MessageImpl - the associated message
235      * @throws PersistenceException - an sql related error
236      */
237     public MessageImpl get(Connection connection, String messageId)
238         throws PersistenceException {
239 
240         MessageImpl result = null;
241         PreparedStatement select = null;
242         ResultSet set = null;
243         try {
244             select = connection.prepareStatement(
245                 "select messageBlob, processed from messages where messageId=?");
246 
247             select.setString(1, messageId);
248             set = select.executeQuery();
249             if (set.next()) {
250                 result = deserialize(set.getBytes(1));
251                 result.setProcessed((set.getInt(2) == 1 ? true : false));
252             }
253         } catch (SQLException exception) {
254             throw new PersistenceException(
255                 "Failed to retrieve message, id=" + messageId, exception);
256         } finally {
257             SQLHelper.close(set);
258             SQLHelper.close(select);
259         }
260 
261         return result;
262     }
263 
264     /***
265      * Delete all messages for the given destination
266      *
267      * @param connection - execute on this connection
268      * @param destination the destination to remove messages for
269      * @return int - the number of messages purged
270      * @throws PersistenceException - an sql related error
271      */
272     public int removeMessages(Connection connection, String destination)
273         throws PersistenceException {
274 
275         int result = 0;
276         PreparedStatement delete = null;
277 
278         // map the destination name to an id
279         long destinationId = _destinations.getId(destination);
280         if (destinationId == 0) {
281             throw new PersistenceException("Cannot delete messages for " +
282                 "destination=" + destination +
283                 ": destination does not exist");
284         }
285 
286         try {
287             delete = connection.prepareStatement(
288                 "delete from messages where destinationId = ?");
289             delete.setLong(1, destinationId);
290             result = delete.executeUpdate();
291         } catch (SQLException exception) {
292             throw new PersistenceException(
293                 "Failed to remove messages for destination=" + destination,
294                 exception);
295         } finally {
296             SQLHelper.close(delete);
297         }
298 
299         return result;
300     }
301 
302     /***
303      * Retrieve the next set of messages for the specified destination with
304      * an acceptance time greater or equal to that specified. It will retrieve
305      * around 200 or so messages depending on what is available.
306      *
307      * @param connection - execute on this connection
308      * @param destination - the destination
309      * @param priority - the priority of the messages
310      * @param time - with timestamp greater or equal to this
311      * @return Vector - one or more MessageImpl objects
312      * @throws PersistenceException - if an SQL error occurs
313      */
314     public Vector getMessages(Connection connection, String destination,
315                               int priority, long time)
316         throws PersistenceException {
317 
318         PreparedStatement select = null;
319         ResultSet set = null;
320         Vector messages = new Vector();
321 
322         try {
323             JmsDestination dest = _destinations.get(destination);
324             if (dest == null) {
325                 throw new PersistenceException(
326                     "Cannot getMessages for destination=" + destination
327                     + ": destination does not exist");
328             }
329 
330             long destinationId = _destinations.getId(destination);
331             if (destinationId == 0) {
332                 throw new PersistenceException(
333                     "Cannot getMessages for destination=" + destination
334                     + ": destination does not exist");
335             }
336 
337             if ((dest instanceof JmsTopic) &&
338                 (((JmsTopic) dest).isWildCard())) {
339                 // if the destination is a wildcard then we can't only select
340                 // on timestamp. This will fault in any message greater than
341                 // or equal to the specified timestamp.
342                 select = connection.prepareStatement(
343                     "select createtime,processed,messageblob from messages "
344                     + "where priority=? and createTime>=? "
345                     + "order by createTime asc");
346                 select.setInt(1, priority);
347                 select.setLong(2, time);
348             } else {
349                 // if the destination is more specific then we can execute a
350                 // more specialized query and fault in other messages for
351                 // the same destination.
352                 select = connection.prepareStatement(
353                     "select createtime,processed,messageblob from messages "
354                     + "where destinationId=? and priority=? and createTime>=? "
355                     + "order by createTime asc");
356                 select.setLong(1, destinationId);
357                 select.setInt(2, priority);
358                 select.setLong(3, time);
359             }
360             set = select.executeQuery();
361 
362             // now iterate through the result set
363             int count = 0;
364             long lastTimeStamp = time;
365             while (set.next()) {
366                 MessageImpl m = deserialize(set.getBytes(3));
367                 m.setProcessed((set.getInt(2) == 1 ? true : false));
368                 messages.add(m);
369                 if (++count > 200) {
370                     // if there are more than two hundred rows then exist
371                     // the loop after 200 messages have been retrieved
372                     // and the timestamp has changed.
373                     if (set.getLong(1) > lastTimeStamp) {
374                         break;
375                     }
376                 } else {
377                     lastTimeStamp = set.getLong(1);
378                 }
379             }
380         } catch (SQLException exception) {
381             throw new PersistenceException(
382                 "Failed to retrieve messages", exception);
383         } finally {
384             SQLHelper.close(set);
385             SQLHelper.close(select);
386         }
387 
388         return messages;
389     }
390 
391     /***
392      * Retrieve the specified number of message ids from the database with a
393      * time greater than that specified. The number of items to retrieve
394      * is only a hint and does not reflect the number of messages actually
395      * returned.
396      *
397      * @param connection - execute on this connection
398      * @param time - with timestamp greater than
399      * @param hint - an indication of the number of messages to return.
400      * @return a map of messageId Strings to their creation time
401      * @throws PersistenceException - if an SQL error occurs
402      */
403     public HashMap getMessageIds(Connection connection, long time, int hint)
404         throws PersistenceException {
405 
406         PreparedStatement select = null;
407         ResultSet set = null;
408         HashMap messages = new HashMap();
409 
410         try {
411             select = connection.prepareStatement(
412                 "select messageId,createTime from messages where createTime>? "
413                 + "order by createTime asc");
414             select.setLong(1, time);
415             set = select.executeQuery();
416 
417             // now iterate through the result set
418             int count = 0;
419             long lastTimeStamp = time;
420             while (set.next()) {
421                 messages.put(set.getString(1), new Long(set.getLong(2)));
422                 if (++count > hint) {
423                     if (set.getLong(2) > lastTimeStamp) {
424                         break;
425                     }
426                 } else {
427                     lastTimeStamp = set.getLong("createTime");
428                 }
429 
430             }
431         } catch (SQLException exception) {
432             throw new PersistenceException(
433                 "Failed to retrieve message identifiers", exception);
434         } finally {
435             SQLHelper.close(set);
436             SQLHelper.close(select);
437         }
438 
439         return messages;
440     }
441 
442     /***
443      * Retrieve a list of unprocessed messages and return them to the client.
444      * An unprocessed message has been accepted by the system but not
445      * processed.
446      *
447      * @param connection - execute on this connection
448      * @return Vector - one or more MessageImpl objects
449      * @throws PersistenceException - if an SQL error occurs
450      */
451     public Vector getUnprocessedMessages(Connection connection)
452         throws PersistenceException {
453 
454         PreparedStatement select = null;
455         ResultSet set = null;
456         Vector messages = new Vector();
457 
458         try {
459             select = connection.prepareStatement(
460                 "select messageblob from messages where processed=0");
461             set = select.executeQuery();
462             // now iterate through the result set
463             while (set.next()) {
464                 MessageImpl m = deserialize(set.getBytes(1));
465                 m.setProcessed(false);
466                 messages.add(m);
467             }
468         } catch (SQLException exception) {
469             throw new PersistenceException(
470                 "Failed to retrieve unprocessed messages", exception);
471         } finally {
472             SQLHelper.close(set);
473             SQLHelper.close(select);
474         }
475 
476         return messages;
477     }
478 
479     /***
480      * Retrieve the message handle for all unexpired messages
481      *
482      * @param connection - execute on this connection
483      * @param destination - the destination in question
484      * @return Vector - collection of PersistentMessageHandle objects
485      * @throws  PersistenceException - sql releated exception
486      */
487     public Vector getNonExpiredMessages(Connection connection,
488                                         JmsDestination destination)
489         throws PersistenceException {
490 
491         Vector result = new Vector();
492         PreparedStatement select = null;
493         ResultSet set = null;
494 
495         try {
496             long destinationId = _destinations.getId(destination.getName());
497 
498             if (destinationId == 0) {
499                 throw new PersistenceException(
500                     "Cannot getMessages for destination=" + destination
501                     + ": destination does not exist");
502             }
503 
504             select = connection.prepareStatement(
505                 "select messageId,destinationId,priority,createTime,"
506                 + "sequenceNumber,expiryTime "
507                 + "from messages "
508                 + "where expiryTime>0 and destinationId=? "
509                 + "order by expiryTime asc");
510             select.setLong(1, destinationId);
511             set = select.executeQuery();
512 
513             while (set.next()) {
514                 String messageId = set.getString(1);
515                 int priority = set.getInt(3);
516                 long acceptedTime = set.getLong(4);
517                 long sequenceNumber = set.getLong(5);
518                 long expiryTime = set.getLong(6);
519                 PersistentMessageHandle handle = new PersistentMessageHandle(
520                         messageId, priority, acceptedTime, sequenceNumber,
521                         expiryTime, destination);
522                 result.add(handle);
523             }
524         } catch (SQLException exception) {
525             throw new PersistenceException(
526                 "Failed to retrieve non-expired messages", exception);
527         } finally {
528             SQLHelper.close(set);
529             SQLHelper.close(select);
530         }
531 
532         return result;
533     }
534 
535     /***
536      * Delete all expired messages and associated message handles.
537      *
538      * @param connection - execute on this connection
539      * @throws PersistenceException - if an SQL error occurs
540      */
541     public void removeExpiredMessages(Connection connection)
542         throws PersistenceException {
543 
544         PreparedStatement delete = null;
545         try {
546             long time = System.currentTimeMillis();
547 
548             // delete from the messages
549             delete = connection.prepareStatement(
550                 "delete from messages where expiryTime > 0 and expiryTime < ?");
551             delete.setLong(1, time);
552             delete.executeUpdate();
553             delete.close();
554 
555             // delete the message handles
556             delete = connection.prepareStatement(
557                 "delete from message_handles where expiryTime > 0 and expiryTime < ?");
558             delete.setLong(1, time);
559             delete.executeUpdate();
560         } catch (SQLException exception) {
561             throw new PersistenceException(
562                 "Failed to remove expired messages", exception);
563         } finally {
564             SQLHelper.close(delete);
565         }
566     }
567 
568     /***
569      * Get the message as a serialized blob
570      *
571      * @param       message             the message to serialize
572      * @return      byte[]              the serialized message
573      */
574     public byte[] serialize(MessageImpl message)
575         throws PersistenceException {
576 
577         byte[] result = null;
578         ObjectOutputStream ostream = null;
579         try {
580             ByteArrayOutputStream bstream = new ByteArrayOutputStream();
581             ostream = new ObjectOutputStream(bstream);
582             ostream.writeObject(message);
583             result = bstream.toByteArray();
584         } catch (Exception exception) {
585             throw new PersistenceException("Failed to serialize message",
586                 exception);
587         } finally {
588             SQLHelper.close(ostream);
589         }
590 
591         return result;
592     }
593 
594     /***
595      * Set the message from a serialized blob
596      *
597      * @param blob the serialized message
598      * @return the re-constructed message
599      */
600     public MessageImpl deserialize(byte[] blob) throws PersistenceException {
601         MessageImpl message = null;
602 
603         if (blob != null) {
604             ObjectInputStream istream = null;
605             try {
606                 ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
607                 istream = new ObjectInputStream(bstream);
608                 message = (MessageImpl) istream.readObject();
609             } catch (Exception exception) {
610                 throw new PersistenceException(
611                     "Failed to de-serialize message", exception);
612             } finally {
613                 SQLHelper.close(istream);
614             }
615         } else {
616             throw new PersistenceException(
617                 "Cannot de-serialize null message blob");
618         }
619 
620         return message;
621     }
622 
623 }