View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: MessageCache.java,v 1.9 2003/12/29 13:09:25 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  import java.util.Collections;
52  import java.util.HashMap;
53  import java.util.Map;
54  
55  import javax.jms.JMSException;
56  
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  
60  import org.exolab.jms.message.MessageHandle;
61  import org.exolab.jms.message.MessageId;
62  import org.exolab.jms.message.MessageImpl;
63  import org.exolab.jms.persistence.DatabaseService;
64  import org.exolab.jms.persistence.PersistenceAdapter;
65  import org.exolab.jms.persistence.PersistenceException;
66  
67  
68  /***
69   * The MessageCache is responsible for managing the collection of persistent
70   * messages for a particular {@link DestinationCache} or {@link Endpoint}.
71   * <p>
72   * Persistent messages and transient messages are stored in different caches
73   * since they behave slightly different. Persistent messages can be passivated
74   * to disk when memory is low.
75   * <p>
76   * We leave garbage collection of messages to the Java Garbage Collection
77   * mechanism.
78   *
79   * @version     $Revision: 1.9 $ $Date: 2003/12/29 13:09:25 $
80   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
81   **/
82  class MessageCache {
83  
84      /***
85       * Maintain the pool of transient messages.
86       */
87      private transient Map _transient =
88          Collections.synchronizedMap(new HashMap(1023));
89  
90      /***
91       * Maintain the pool of persistent messages.
92       */
93      private transient Map _persistent =
94          Collections.synchronizedMap(new HashMap(1023));
95  
96      /***
97       * Maintains a list of {@link MessageHandles}, both transient and
98       * persistent. The messages are sorted based on priority
99       */
100     private MessageQueue _handles =
101         new MessageQueue(new MessageHandleComparator());
102 
103     /***
104      * The logger
105      */
106     private static final Log _log = LogFactory.getLog(MessageCache.class);
107 
108 
109     /***
110      * Default constructor
111      */
112     MessageCache() {
113     }
114 
115     /***
116      * Add the following message to the cache
117      *
118      * @param id - the message identity
119      * @param message - the message to add
120      */
121     void addMessage(MessageId id, MessageImpl message) {
122         if (message.isPersistent()) {
123             addPersistentMessage(message);
124         } else {
125             addNonPersistentMessage(message);
126         }
127     }
128 
129     /***
130      * Add the following handle and associated message to the cache
131      *
132      * @param handle - the message handle to add
133      * @param message - the message to add
134      */
135     void addMessage(MessageHandle handle, MessageImpl message) {
136         addMessage(handle.getMessageId(), message);
137         addHandle(handle);
138     }
139 
140     /***
141      * Add the following message handle to the cache
142      *
143      * @param handle - the message handle to add
144      */
145     void addHandle(MessageHandle handle) {
146         _handles.add(handle);
147     }
148 
149     /***
150      * Remove the message with the following id from the cache and return
151      * it to the client.
152      *
153      * @param id - the id of the message to remove
154      * @param MessageImpl - the removed message or null
155      */
156     MessageImpl removeMessage(MessageId id) {
157         MessageImpl message = removePersistentMessage(id);
158         if (message == null) {
159             message = removeNonPersistentMessage(id);
160         }
161 
162         return message;
163     }
164 
165     /***
166      * Remove the specified handle from the cache
167      *
168      * @param handle - the handle to remove
169      * @return boolean - true if successful
170      */
171     boolean removeHandle(MessageHandle handle) {
172         return _handles.remove(handle);
173     }
174 
175     /***
176      * Remove and return the first message handle in the cache
177      *
178      * @return MessageHandle - the first handle or null if cache is empty
179      */
180     final MessageHandle removeFirstHandle() {
181         return (MessageHandle) _handles.removeFirst();
182     }
183 
184     /***
185      * Return an array of all handles in the cache
186      *
187      * @return Object[]
188      */
189     Object[] getHandleArray() {
190         return _handles.toArray();
191     }
192 
193     /***
194      * Check whether the specified handle is cache
195      *
196      * @param handle - the handle to check
197      * @return boolean - true if it is
198      */
199     final boolean containsHandle(MessageHandle handle) {
200         return _handles.contains(handle);
201     }
202 
203     /***
204      * Return the message corresponding to the specified handle
205      *
206      * @param handle - the handle to check
207      * @param MessageImpl - the associated message or null
208      */
209     MessageImpl getMessage(final MessageHandle handle) {
210         if (handle == null) {
211             return null;
212         }
213 
214         MessageImpl message = null;
215         MessageId id = handle.getMessageId();
216 
217         if (handle.isPersistent()) {
218             message = (MessageImpl) _persistent.get(id);
219 
220             // if the message is not cached then try and retrieve it from the
221             // database and cache it.
222             if (message == null) {
223                 // fault in at least the next message from the database
224                 PersistentMessageHandle phandle = null;
225                 if (handle instanceof PersistentMessageHandle) {
226                     phandle = (PersistentMessageHandle) handle;
227                 } else {
228                     // this is a transient handle holding a reference to
229                     // a persistent message. Create a persistent handle
230                     // from it
231                     try {
232                         phandle = (PersistentMessageHandle)
233                             MessageHandleFactory.createPersistentHandle(
234                                 (TransientMessageHandle) handle);
235                     } catch (Exception exception) {
236                         _log.error("Failed to retrieve message", exception);
237                     }
238                 }
239 
240                 // if we have a phandle then retrieve it from the database
241                 if (phandle != null) {
242                     PersistenceAdapter adapter = DatabaseService.getAdapter();
243                     Connection connection = null;
244                     try {
245                         connection = DatabaseService.getConnection();
246                         message = adapter.getMessage(
247                             connection, phandle.getMessageId().getId());
248                         connection.commit();
249                     } catch (PersistenceException exception) {
250                         if (connection != null) {
251                             try {
252                                 connection.rollback();
253                             } catch (Exception nested) {
254                                 // ignore
255                             }
256                         }
257                         _log.error("Failed to retrieve message", exception);
258                     } catch (Exception exception) {
259                         _log.error("Failed to retrieve message", exception);
260                     } finally {
261                         if (connection != null) {
262                             try {
263                                 connection.close();
264                             } catch (Exception nested) {
265                                 // ignore
266                             }
267                         }
268                     }
269 
270                     // add the message to the persistent cache once it has been
271                     // retrieved from the datastore
272                     if (message != null) {
273                         synchronized (_persistent) {
274                             _persistent.put(id, message);
275                         }
276                     }
277                 }
278             }
279         } else {
280             message = (MessageImpl) _transient.get(id);
281         }
282 
283         if ((message != null) &&
284             (!message.getReadOnly())) {
285             // mark the message as read-only. If this fails, log the
286             // error and return null.
287             try {
288                 message.setReadOnly(true);
289             } catch (JMSException exception) {
290                 _log.error(exception);
291                 message = null;
292             }
293         }
294 
295         return message;
296     }
297 
298     /***
299      * Clear the persistent and non-persistent message cache
300      */
301     void clear() {
302         _transient.clear();
303         _persistent.clear();
304         _handles.clear();
305     }
306 
307     /***
308      * Clear only the persistent messages in the cache
309      *
310      */
311     void clearPersistentMessages() {
312         _persistent.clear();
313     }
314 
315     /***
316      * Clear only the transient messages in the cache
317      *
318      */
319     void clearTransientMessages() {
320         _transient.clear();
321     }
322 
323     /***
324      * Add a non-persistent message to the cache
325      *
326      * @param message - the message to add
327      */
328     private void addNonPersistentMessage(MessageImpl message) {
329         synchronized (_transient) {
330             _transient.put(message.getMessageId(), message);
331         }
332     }
333 
334     /***
335      * Remove the non persistnet message with the specified identity from the
336      * transient cache.
337      *
338      * @param id - the id of the message to remove
339      * @return MessageImpl - the remove message or null
340      */
341     private MessageImpl removeNonPersistentMessage(MessageId id) {
342         MessageImpl message = null;
343         synchronized (_transient) {
344             message = (MessageImpl) _transient.remove(id);
345         }
346 
347         return message;
348     }
349 
350     /***
351      * Remove the  persistnet message with the specified identity from the
352      * persistent cache.
353      *
354      * @param id - the id of the message to remove
355      * @return MessageImpl - the remove message or null
356      */
357     private MessageImpl removePersistentMessage(MessageId id) {
358         MessageImpl message = null;
359         synchronized (_persistent) {
360             message = (MessageImpl) _persistent.remove(id);
361         }
362 
363         return message;
364     }
365 
366     /***
367      * Add persistent message to the persistent cache
368      *
369      * @param message - the message to add
370      */
371     private void addPersistentMessage(MessageImpl message) {
372         synchronized (_persistent) {
373             _persistent.put(message.getMessageId(), message);
374         }
375     }
376 
377     /***
378      * Return the number of messages in the transient cache
379      *
380      * @return int
381      */
382     public int getTransientCount() {
383         return _transient.size();
384     }
385 
386     /***
387      * Return the number of messages in the persistent cache
388      *
389      * @return int
390      */
391     public int getPersistentCount() {
392         return _persistent.size();
393     }
394 
395     /***
396      * Return the number of handles in the cache
397      *
398      * @return int
399      */
400     public int getHandleCount() {
401         return _handles.size();
402     }
403 }
404