View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  
44  package org.exolab.jms.persistence;
45  
46  import java.io.ByteArrayInputStream;
47  import java.io.ByteArrayOutputStream;
48  import java.io.ObjectInputStream;
49  import java.io.ObjectOutputStream;
50  import java.sql.Connection;
51  import java.sql.PreparedStatement;
52  import java.sql.ResultSet;
53  import java.sql.SQLException;
54  import java.util.HashMap;
55  import java.util.Vector;
56  
57  import javax.jms.Destination;
58  import javax.jms.JMSException;
59  import javax.sql.DataSource;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  
64  import org.exolab.jms.client.JmsDestination;
65  import org.exolab.jms.client.JmsTopic;
66  import org.exolab.jms.message.MessageId;
67  import org.exolab.jms.message.MessageImpl;
68  import org.exolab.jms.messagemgr.PersistentMessageHandle;
69  
70  
71  /***
72   * This class manages the persistent of message objects. It maintains
73   * all the SQL for achieving this.
74   *
75   * @version     $Revision: 1.24 $ $Date: 2004/01/08 05:55:07 $
76   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
77   * @see         org.exolab.jms.message.MessageImpl
78   * @see         org.exolab.jms.persistence.RDBMSAdapter
79   */
80  public class Messages {
81  
82      /***
83       * Caches the singleton instance of this class
84       */
85      private static Messages _instance;
86  
87      /***
88       * Cache the transaction retry count
89       */
90      private static int _retryCount = 10;
91  
92      /***
93       * Cache the retry interval in milliseconds
94       */
95      private static long _retryInterval = 50;
96  
97      /***
98       * Monitor used to synchronize the initialization of this
99       * class
100      */
101     private static final Object _block = new Object();
102 
103     /***
104      * The logger
105      */
106     private static final Log _log = LogFactory.getLog(Messages.class);
107 
108 
109     /***
110      * Returns a reference to the singleton instance.
111      * <p>
112      * Note that initialise() must have been invoked first for this
113      * to return a valid instance.
114      *
115      * @return      Messages            the singleton instance
116      */
117     public static Messages instance() {
118         return _instance;
119     }
120 
121     /***
122      * Create an initialise the singleton istance of this class.
123      *
124      * @return      Messages           the singleton instance
125      */
126     public static Messages initialise() {
127         if (_instance == null) {
128             synchronized (_block) {
129                 if (_instance == null) {
130                     _instance = new Messages();
131                 }
132             }
133         }
134         return _instance;
135     }
136 
137     /***
138      * Add a message to the database, in the context of the specified
139      * transaction and connection.
140      *
141      * @param connection - execute on this connection
142      * @param message - the message to add
143      * @throws PersistenceException - an sql related error
144      */
145     public void add(Connection connection, MessageImpl message)
146         throws PersistenceException {
147 
148         PreparedStatement insert = null;
149 
150         // extract the identity of the message
151         String messageId = message.getMessageId().getId();
152 
153         // check that the destination is actually registered
154         // and map the name to the corresponding id
155         String name;
156         try {
157             name = ((JmsDestination) message.getJMSDestination()).getName();
158         } catch (JMSException exception) {
159             throw new PersistenceException(
160                 "Failed to get destination for message=" +
161                 message.getMessageId(), exception);
162         }
163 
164         long destinationId = Destinations.instance().getId(name);
165         if (destinationId == 0) {
166             throw new PersistenceException(
167                 "Cannot add message=" + message.getMessageId() +
168                 ", destination=" + name + " (" + destinationId +
169                 "): destination does not exist");
170         }
171 
172         try {
173             // create, populate and execute the insert
174             insert = connection.prepareStatement(
175                 "insert into messages values " + "(?,?,?,?,?,?,?)");
176             insert.setString(1, messageId);
177             insert.setLong(2, destinationId);
178             insert.setInt(3, message.getJMSPriority());
179             insert.setLong(4, message.getAcceptedTime());
180             insert.setLong(5, message.getJMSExpiration());
181             insert.setInt(6, (message.getProcessed()) ? 1 : 0);
182 
183             // serialize the message
184             byte[] bytes = serialize(message);
185             insert.setBinaryStream(7, new ByteArrayInputStream(bytes),
186                 bytes.length);
187             //insert.setBytes(8, bytes);
188 
189             // execute the insert
190             if (insert.executeUpdate() != 1) {
191                 throw new PersistenceException(
192                     "Failed to add message=" + message.getMessageId() +
193                     ", destination=" + name + " (" + destinationId + ")");
194             }
195         } catch (PersistenceException exception) {
196             throw exception;
197         } catch (Exception exception) {
198             throw new PersistenceException(
199                 "Failed to add message=" + message.getMessageId() +
200                 ", destination=" + name + " (" + destinationId + ")",
201                 exception);
202         } finally {
203             SQLHelper.close(insert);
204         }
205     }
206 
207     /***
208      * Update the message state in the database. This will be called to set
209      * the message state to processed by the provider
210      *
211      * @param connection - execute on this connection
212      * @param message - the message to update
213      * @throws PersistenceException - an sql related error
214      */
215     public void update(Connection connection, MessageImpl message)
216         throws PersistenceException {
217 
218         PreparedStatement update = null;
219 
220         // extract the identity of the message
221         String messageId = message.getMessageId().getId();
222 
223         try {
224             update = connection.prepareStatement(
225                 "update messages set processed=? where messageId=?");
226             update.setInt(1, message.getProcessed() ? 1 : 0);
227             update.setString(2, messageId);
228 
229             // execute the update
230             if (update.executeUpdate() != 1) {
231                 _log.error("Cannot update message=" + messageId);
232             }
233         } catch (SQLException exception) {
234             throw new PersistenceException(
235                 "Failed to update message, id=" + messageId, exception);
236         } finally {
237             SQLHelper.close(update);
238         }
239     }
240 
241     /***
242      * Remove a message with the specified identity from the database
243      *
244      * @param connection - execute on this connection
245      * @param messageId - the message id of the message to remove
246      * @throws PersistenceException - an sql related error
247      */
248     public void remove(Connection connection, String messageId)
249         throws PersistenceException {
250 
251         PreparedStatement delete = null;
252         try {
253             delete = connection.prepareStatement(
254                 "delete from messages where messageId=?");
255             delete.setString(1, messageId);
256 
257             // execute the delete
258             if (delete.executeUpdate() != 1) {
259                 _log.error("Cannot remove message=" + messageId);
260             }
261         } catch (SQLException exception) {
262             throw new PersistenceException(
263                 "Failed to remove message, id=" + messageId, exception);
264         } finally {
265             SQLHelper.close(delete);
266         }
267     }
268 
269     /***
270      * Return the message identified by the message Id
271      *
272      * @param connection - execute on this connection
273      * @param messageId - id of message to retrieve
274      * @return MessageImpl - the associated message
275      * @throws PersistenceException - an sql related error
276      */
277     public MessageImpl get(Connection connection, String messageId)
278         throws PersistenceException {
279 
280         MessageImpl result = null;
281         PreparedStatement select = null;
282         ResultSet set = null;
283         try {
284             select = connection.prepareStatement(
285                 "select messageBlob, processed from messages where messageId=?");
286 
287             select.setString(1, messageId);
288             set = select.executeQuery();
289             if (set.next()) {
290                 result = deserialize(set.getBytes(1));
291                 result.setProcessed((set.getInt(2) == 1 ? true : false));
292             }
293         } catch (SQLException exception) {
294             throw new PersistenceException(
295                 "Failed to retrieve message, id=" + messageId, exception);
296         } finally {
297             SQLHelper.close(set);
298             SQLHelper.close(select);
299         }
300 
301         return result;
302     }
303 
304     /***
305      * Delete all messages for the given destination
306      *
307      * @param connection - execute on this connection
308      * @param messageId - id of message to retrieve
309      * @return int - the number of messages purged
310      * @throws PersistenceException - an sql related error
311      */
312     public int removeMessages(Connection connection, String destination)
313         throws PersistenceException {
314 
315         int result = 0;
316         PreparedStatement delete = null;
317 
318         // map the destination name to an id
319         long destinationId = Destinations.instance().getId(destination);
320         if (destinationId == 0) {
321             throw new PersistenceException("Cannot delete messages for " +
322                 "destination=" + destination +
323                 ": destination does not exist");
324         }
325 
326         try {
327             delete = connection.prepareStatement(
328                 "delete from messages where destinationId = ?");
329             delete.setLong(1, destinationId);
330             result = delete.executeUpdate();
331         } catch (SQLException exception) {
332             throw new PersistenceException(
333                 "Failed to remove messages for destination=" + destination,
334                 exception);
335         } finally {
336             SQLHelper.close(delete);
337         }
338 
339         return result;
340     }
341 
342     /***
343      * Retrieve the next set of messages for the specified destination with
344      * an acceptance time greater or equal to that specified. It will retrieve
345      * around 200 or so messages depending on what is available.
346      *
347      * @param connection - execute on this connection
348      * @param destination - the destination
349      * @param priority - the priority of the messages
350      * @param time - with timestamp greater or equal to this
351      * @return Vector - one or more MessageImpl objects
352      * @throws PersistenceException - if an SQL error occurs
353      */
354     public Vector getMessages(Connection connection, String destination,
355                               int priority, long time)
356         throws PersistenceException {
357 
358         PreparedStatement select = null;
359         ResultSet set = null;
360         Vector messages = new Vector();
361 
362         try {
363             JmsDestination dest = Destinations.instance().get(destination);
364             if (dest == null) {
365                 throw new PersistenceException(
366                     "Cannot getMessages for destination=" + destination
367                     + ": destination does not exist");
368             }
369 
370             long destinationId = Destinations.instance().getId(destination);
371             if (destinationId == 0) {
372                 throw new PersistenceException(
373                     "Cannot getMessages for destination=" + destination
374                     + ": destination does not exist");
375             }
376 
377             if ((dest instanceof JmsTopic) &&
378                 (((JmsTopic) dest).isWildCard())) {
379                 // if the destination is a wildcard then we can't only select
380                 // on timestamp. This will fault in any message greater than
381                 // or equal to the specified timestamp.
382                 select = connection.prepareStatement(
383                     "select * from messages where priority=? and createTime>=? order by createTime asc");
384                 select.setInt(1, priority);
385                 select.setLong(2, time);
386             } else {
387                 // if the destination is more specific then we can execute a
388                 // more specialized query and fault in other messages for
389                 // the same destination.
390                 select = connection.prepareStatement(
391                     "select * from messages where destinationId=? and priority=? and createTime>=? order by createTime asc");
392                 select.setLong(1, destinationId);
393                 select.setInt(2, priority);
394                 select.setLong(3, time);
395             }
396             set = select.executeQuery();
397 
398             // now iterate through the result set
399             int count = 0;
400             long lastTimeStamp = time;
401             while (set.next()) {
402                 MessageImpl m = deserialize(set.getBytes("messageBlob"));
403                 m.setProcessed((set.getInt("processed") == 1 ? true : false));
404                 messages.add(m);
405                 if (++count > 200) {
406                     // if there are more than two hundred rows then exist
407                     // the loop after 200 messages have been retrieved
408                     // and the timestamp has changed.
409                     if (set.getLong("createTime") > lastTimeStamp) {
410                         break;
411                     }
412                 } else {
413                     lastTimeStamp = set.getLong("createTime");
414                 }
415             }
416         } catch (SQLException exception) {
417             throw new PersistenceException(
418                 "Failed to retrieve messages", exception);
419         } finally {
420             SQLHelper.close(set);
421             SQLHelper.close(select);
422         }
423 
424         return messages;
425     }
426 
427     /***
428      * Retrieve the specified number of message ids from the database with a
429      * time greater than that specified. The number of items to retrieve
430      * is only a hint and does not reflect the number of messages actually
431      * returned.
432      *
433      * @param connection - execute on this connection
434      * @param time - with timestamp greater than
435      * @param hint - an indication of the number of messages to return.
436      * @return a map of messageId Strings to their creation time
437      * @throws PersistenceException - if an SQL error occurs
438      */
439     public HashMap getMessageIds(Connection connection, long time, int hint)
440         throws PersistenceException {
441 
442         PreparedStatement select = null;
443         ResultSet set = null;
444         HashMap messages = new HashMap();
445 
446         try {
447             select = connection.prepareStatement(
448                 "select messageId,createTime from messages where createTime>? order by createTime asc");
449             select.setLong(1, time);
450             set = select.executeQuery();
451 
452             // now iterate through the result set
453             int count = 0;
454             long lastTimeStamp = time;
455             while (set.next()) {
456                 messages.put(set.getString("messageId"),
457                     new Long(set.getLong("createTime")));
458                 if (++count > hint) {
459                     if (set.getLong("createTime") > lastTimeStamp) {
460                         break;
461                     }
462                 } else {
463                     lastTimeStamp = set.getLong("createTime");
464                 }
465 
466             }
467         } catch (SQLException exception) {
468             throw new PersistenceException(
469                 "Failed to retrieve message identifiers", exception);
470         } finally {
471             SQLHelper.close(set);
472             SQLHelper.close(select);
473         }
474 
475         return messages;
476     }
477 
478     /***
479      * Retrieve a list of unprocessed messages and return them to the client.
480      * An unprocessed message has been accepted by the system but not
481      * processed.
482      *
483      * @param connection - execute on this connection
484      * @return Vector - one or more MessageImpl objects
485      * @throws PersistenceException - if an SQL error occurs
486      */
487     public Vector getUnprocessedMessages(Connection connection)
488         throws PersistenceException {
489 
490         PreparedStatement select = null;
491         ResultSet set = null;
492         Vector messages = new Vector();
493 
494         try {
495             select = connection.prepareStatement(
496                 "select * from messages where processed=0");
497             set = select.executeQuery();
498             // now iterate through the result set
499             while (set.next()) {
500                 MessageImpl m = deserialize(set.getBytes("messageBlob"));
501                 m.setProcessed(false);
502                 messages.add(m);
503             }
504         } catch (SQLException exception) {
505             throw new PersistenceException(
506                 "Failed to retrieve unprocessed messages", exception);
507         } finally {
508             SQLHelper.close(set);
509             SQLHelper.close(select);
510         }
511 
512         return messages;
513     }
514 
515     /***
516      * Retrieve the message handle for all unexpired messages
517      *
518      * @param connection - execute on this connection
519      * @param destination - the destination in question
520      * @return Vector - collection of PersistentMessageHandle objects
521      * @throws  PersistenceException - sql releated exception
522      */
523     public Vector getNonExpiredMessages(Connection connection,
524                                         JmsDestination destination)
525         throws PersistenceException {
526 
527         Vector result = new Vector();
528         PreparedStatement select = null;
529         ResultSet set = null;
530 
531         try {
532             long destinationId = Destinations.instance().getId(
533                 destination.getName());
534 
535             if (destinationId == 0) {
536                 throw new PersistenceException(
537                     "Cannot getMessages for destination=" + destination
538                     + ": destination does not exist");
539             }
540 
541             select = connection.prepareStatement(
542                 "select messageId,destinationId,priority,createTime,expiryTime" +
543                 " from messages where expiryTime>0 and destinationId=? order by expiryTime asc");
544             select.setLong(1, destinationId);
545             set = select.executeQuery();
546 
547             while (set.next()) {
548                 PersistentMessageHandle handle = new PersistentMessageHandle();
549                 handle.setMessageId(new MessageId(set.getString(1)));
550                 handle.setDestination(destination);
551                 handle.setPriority(set.getInt(3));
552                 handle.setAcceptedTime(set.getLong(4));
553                 handle.setExpiryTime(set.getLong(5));
554                 result.add(handle);
555             }
556         } catch (SQLException exception) {
557             throw new PersistenceException(
558                 "Failed to retrieve non-expired messages", exception);
559         } finally {
560             SQLHelper.close(set);
561             SQLHelper.close(select);
562         }
563 
564         return result;
565     }
566 
567     /***
568      * Delete all expired messages and associated message handles.
569      *
570      * @param connection - execute on this connection
571      * @throws PersistenceException - if an SQL error occurs
572      */
573     public void removeExpiredMessages(Connection connection)
574         throws PersistenceException {
575 
576         PreparedStatement delete = null;
577         try {
578             long time = System.currentTimeMillis();
579 
580             // delete from the messages
581             delete = connection.prepareStatement(
582                 "delete from messages where expiryTime > 0 and expiryTime < ?");
583             delete.setLong(1, time);
584             delete.executeUpdate();
585             delete.close();
586 
587             // delete the message handles
588             delete = connection.prepareStatement(
589                 "delete from message_handles where expiryTime > 0 and expiryTime < ?");
590             delete.setLong(1, time);
591             delete.executeUpdate();
592         } catch (SQLException exception) {
593             throw new PersistenceException(
594                 "Failed to remove expired messages", exception);
595         } finally {
596             SQLHelper.close(delete);
597         }
598     }
599 
600     /***
601      * Reset the instance. We need to deprecate this method since this
602      * class does not contain state information
603      */
604     public void close() {
605         _instance = null;
606     }
607 
608     /***
609      * Default constructor does nothing at the moment.
610      */
611     protected Messages() {
612     }
613 
614     /***
615      * Get the message as a serialized blob
616      *
617      * @param       message             the message to serialize
618      * @return      byte[]              the serialized message
619      */
620     public byte[] serialize(MessageImpl message)
621         throws PersistenceException {
622 
623         byte[] result = null;
624         try {
625             ByteArrayOutputStream bstream = new ByteArrayOutputStream();
626             ObjectOutputStream ostream = new ObjectOutputStream(bstream);
627             ostream.writeObject(message);
628             ostream.close();
629             result = bstream.toByteArray();
630         } catch (Exception exception) {
631             throw new PersistenceException("Failed to serialize message",
632                 exception);
633         }
634 
635         return result;
636     }
637 
638     /***
639      * Set the message from a serialized blob
640      *
641      * @param blob the serialized message
642      * @return the re-constructed message
643      */
644     public MessageImpl deserialize(byte[] blob) throws PersistenceException {
645         MessageImpl message = null;
646 
647         if (blob != null) {
648             try {
649                 ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
650                 ObjectInputStream istream = new ObjectInputStream(bstream);
651                 message = (MessageImpl) istream.readObject();
652                 istream.close();
653             } catch (Exception exception) {
654                 throw new PersistenceException(
655                     "Failed to de-serialize message", exception);
656             }
657         } else {
658             throw new PersistenceException(
659                 "Cannot de-serialize null message blob");
660         }
661 
662         return message;
663     }
664 
665 } //-- Messages