View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.messagemgr;
44  
45  import java.sql.Connection;
46  import java.util.Collections;
47  import java.util.Date;
48  import java.util.HashMap;
49  import java.util.Map;
50  import javax.jms.DeliveryMode;
51  import javax.jms.Destination;
52  import javax.jms.InvalidDestinationException;
53  import javax.jms.JMSException;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  
58  import org.exolab.jms.client.JmsDestination;
59  import org.exolab.jms.message.MessageImpl;
60  import org.exolab.jms.persistence.DatabaseService;
61  import org.exolab.jms.persistence.PersistenceException;
62  import org.exolab.jms.service.Service;
63  import org.exolab.jms.service.ServiceException;
64  
65  
66  /***
67   * This is the active message handling component within the JMS server. Messages
68   * are passed in and added to the appropriate dispatchers for delivery to the
69   * clients.
70   *
71   * @author <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
72   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73   * @version $Revision: 1.5 $ $Date: 2005/11/12 12:27:40 $
74   */
75  public class MessageMgr extends Service implements MessageManager {
76  
77      /***
78       * The database service.
79       */
80      private final DatabaseService _database;
81  
82      /***
83       * The destination manager.
84       */
85      private DestinationManager _destinations;
86  
87      /***
88       * A map of <code>MessageHanagerEventListener instances, keyed on
89       * <code>JmsDestination</ocde>.
90       */
91      private Map _listeners = Collections.synchronizedMap(new HashMap(1023));
92  
93      /***
94       * The seed to allocate to messages to differentiate messages arriving at
95       * the same time.
96       */
97      private long _sequenceNoSeed = 0;
98  
99      /***
100      * Lock for accessing _sequenceNoSeed.
101      */
102     private final Object _lock = new Object();
103 
104     /***
105      * The logger.
106      */
107     private static final Log _log = LogFactory.getLog(MessageMgr.class);
108 
109 
110     /***
111      * Construct a new <code>MessageMgr</code>.
112      *
113      * @param database the database service
114      */
115     public MessageMgr(DatabaseService database) {
116         if (database == null) {
117             throw new IllegalArgumentException("Argument 'database' is null");
118         }
119         _database = database;
120     }
121 
122     /***
123      * Sets the destination manager.
124      *
125      * @param manager the destination manager
126      */
127     public void setDestinationManager(DestinationManager manager) {
128         _destinations = manager;
129     }
130 
131     /***
132      * Prepares a message prior to it being passed through the system.
133      *
134      * @param message the message
135      * @throws JMSException if the message is invalid or cannot be prep'ed
136      */
137     public void prepare(MessageImpl message)
138             throws JMSException {
139         if (message == null) {
140             throw new JMSException("Null message");
141         }
142         Destination destination = message.getJMSDestination();
143         if (destination == null) {
144             throw new InvalidDestinationException("Message has no destination");
145         }
146         if (!(destination instanceof JmsDestination)) {
147             throw new InvalidDestinationException(
148                     "Destination not a JmsDestination");
149         }
150 
151         // mark the message as accepted and attach a sequence number
152         message.setAcceptedTime((new Date()).getTime());
153         message.setSequenceNumber(getNextSequenceNumber());
154         message.setReadOnly(true);
155     }
156 
157     /***
158      * Add a message.
159      *
160      * @param message the message to add
161      * @throws JMSException if the message cannot be added
162      */
163     public void add(MessageImpl message) throws JMSException {
164         prepare(message);
165 
166         JmsDestination destination =
167                 (JmsDestination) message.getJMSDestination();
168         final JmsDestination existing
169                 = _destinations.getDestination(destination.getName());
170         final boolean persistent = (existing != null)
171                 ? existing.getPersistent() : false;
172 
173         try {
174             _database.begin(); // need a transaction for any database access
175 
176             // if the message's delivery mode is PERSISTENT, and the destination
177             // is also persistent, then then process it accordingly, otherwise use
178             // the non-persistent quality of service
179             if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
180                     && persistent) {
181                 addPersistentMessage(message);
182             } else {
183                 addNonPersistentMessage(message);
184             }
185             _database.commit();
186         } catch (Exception exception) {
187             final String msg = "Failed to process message";
188             _log.error(msg, exception);
189             try {
190                 if (_database.isTransacted()) {
191                     _database.rollback();
192                 }
193             } catch (PersistenceException error) {
194                 _log.error(error, error);
195             }
196             if (exception instanceof JMSException) {
197                 throw (JMSException) exception;
198             }
199             throw new JMSException(msg + ": " + exception.getMessage());
200         }
201     }
202 
203     /***
204      * Register a listener for a specific destination, to be notified when
205      * messages for the destination arrive.
206      *
207      * @param destination the destination to register the listener for
208      * @param listener    the listener to notify
209      */
210     public void addEventListener(JmsDestination destination,
211                                  MessageManagerEventListener listener) {
212         _listeners.put(destination, listener);
213     }
214 
215     /***
216      * Remove the listener for the specified destination.
217      *
218      * @param destination the destination to remove the listener for
219      */
220     public void removeEventListener(JmsDestination destination) {
221         _listeners.remove(destination);
222     }
223 
224     /***
225      * Start the service.
226      *
227      * @throws ServiceException if the service fails to start
228      */
229     protected void doStart() throws ServiceException {
230         if (_destinations == null) {
231             throw new ServiceException(
232                     "Cannot start service: DestinationManager not initialised");
233         }
234     }
235 
236     /***
237      * Processes a non-persistent message.
238      *
239      * @param message the message to add
240      * @throws JMSException if the message cannot be processed
241      */
242     private void addNonPersistentMessage(MessageImpl message)
243             throws JMSException {
244 
245         // notify the listener for the destination
246         JmsDestination destination
247                 = (JmsDestination) message.getJMSDestination();
248 
249         MessageManagerEventListener listener = getEventListener(destination);
250         listener.messageAdded(destination, message);
251     }
252 
253     /***
254      * Stop the service.
255      *
256      * @throws ServiceException if the service fails to stop
257      */
258     protected void doStop() throws ServiceException {
259         _listeners.clear();
260     }
261 
262     /***
263      * Add a persistent message.
264      *
265      * @param message the message to add
266      * @throws JMSException if the message cannot be processed
267      * @throws PersistenceException for any persistence error
268      */
269     private void addPersistentMessage(MessageImpl message)
270             throws JMSException, PersistenceException {
271         JmsDestination destination =
272                 (JmsDestination) message.getJMSDestination();
273 
274         Connection connection = _database.getConnection();
275 
276             // add the message to the database
277         _database.getAdapter().addMessage(connection, message);
278 
279         // notify the listener that a persistent message has arrived
280         MessageManagerEventListener listener = getEventListener(destination);
281         listener.persistentMessageAdded(destination, message);
282     }
283 
284     /***
285      * Returns the event listener for the specified destination.
286      * <p/>
287      * If no event listener is registered, it falls back to the destination
288      * manager.
289      *
290      * @param destination the destination
291      * @return the event listener registgered for <code>destination</code>, or
292      *         the destination manager if no is registered
293      */
294     private MessageManagerEventListener getEventListener(
295             JmsDestination destination) {
296         MessageManagerEventListener listener =
297                 (MessageManagerEventListener) _listeners.get(destination);
298 
299         if (listener == null) {
300             // no registered destination cache, so let the destination manager
301             // handle it
302             listener = _destinations;
303         }
304         return listener;
305     }
306 
307     /***
308      * Returns the next seed value to be allocated to a new message.
309      *
310      * @return a unique identifier for a message
311      */
312     private long getNextSequenceNumber() {
313         synchronized (_lock) {
314             return ++_sequenceNoSeed;
315         }
316     }
317 
318 }