1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.messagemgr;
44
45 import java.sql.Connection;
46 import java.util.Collections;
47 import java.util.Date;
48 import java.util.HashMap;
49 import java.util.Map;
50 import javax.jms.DeliveryMode;
51 import javax.jms.Destination;
52 import javax.jms.InvalidDestinationException;
53 import javax.jms.JMSException;
54
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57
58 import org.exolab.jms.client.JmsDestination;
59 import org.exolab.jms.message.MessageImpl;
60 import org.exolab.jms.persistence.DatabaseService;
61 import org.exolab.jms.persistence.PersistenceException;
62 import org.exolab.jms.service.Service;
63 import org.exolab.jms.service.ServiceException;
64
65
66 /***
67 * This is the active message handling component within the JMS server. Messages
68 * are passed in and added to the appropriate dispatchers for delivery to the
69 * clients.
70 *
71 * @author <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
72 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73 * @version $Revision: 1.5 $ $Date: 2005/11/12 12:27:40 $
74 */
75 public class MessageMgr extends Service implements MessageManager {
76
77 /***
78 * The database service.
79 */
80 private final DatabaseService _database;
81
82 /***
83 * The destination manager.
84 */
85 private DestinationManager _destinations;
86
87 /***
88 * A map of <code>MessageHanagerEventListener instances, keyed on
89 * <code>JmsDestination</ocde>.
90 */
91 private Map _listeners = Collections.synchronizedMap(new HashMap(1023));
92
93 /***
94 * The seed to allocate to messages to differentiate messages arriving at
95 * the same time.
96 */
97 private long _sequenceNoSeed = 0;
98
99 /***
100 * Lock for accessing _sequenceNoSeed.
101 */
102 private final Object _lock = new Object();
103
104 /***
105 * The logger.
106 */
107 private static final Log _log = LogFactory.getLog(MessageMgr.class);
108
109
110 /***
111 * Construct a new <code>MessageMgr</code>.
112 *
113 * @param database the database service
114 */
115 public MessageMgr(DatabaseService database) {
116 if (database == null) {
117 throw new IllegalArgumentException("Argument 'database' is null");
118 }
119 _database = database;
120 }
121
122 /***
123 * Sets the destination manager.
124 *
125 * @param manager the destination manager
126 */
127 public void setDestinationManager(DestinationManager manager) {
128 _destinations = manager;
129 }
130
131 /***
132 * Prepares a message prior to it being passed through the system.
133 *
134 * @param message the message
135 * @throws JMSException if the message is invalid or cannot be prep'ed
136 */
137 public void prepare(MessageImpl message)
138 throws JMSException {
139 if (message == null) {
140 throw new JMSException("Null message");
141 }
142 Destination destination = message.getJMSDestination();
143 if (destination == null) {
144 throw new InvalidDestinationException("Message has no destination");
145 }
146 if (!(destination instanceof JmsDestination)) {
147 throw new InvalidDestinationException(
148 "Destination not a JmsDestination");
149 }
150
151
152 message.setAcceptedTime((new Date()).getTime());
153 message.setSequenceNumber(getNextSequenceNumber());
154 message.setReadOnly(true);
155 }
156
157 /***
158 * Add a message.
159 *
160 * @param message the message to add
161 * @throws JMSException if the message cannot be added
162 */
163 public void add(MessageImpl message) throws JMSException {
164 prepare(message);
165
166 JmsDestination destination =
167 (JmsDestination) message.getJMSDestination();
168 final JmsDestination existing
169 = _destinations.getDestination(destination.getName());
170 final boolean persistent = (existing != null)
171 ? existing.getPersistent() : false;
172
173 try {
174 _database.begin();
175
176
177
178
179 if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
180 && persistent) {
181 addPersistentMessage(message);
182 } else {
183 addNonPersistentMessage(message);
184 }
185 _database.commit();
186 } catch (Exception exception) {
187 final String msg = "Failed to process message";
188 _log.error(msg, exception);
189 try {
190 if (_database.isTransacted()) {
191 _database.rollback();
192 }
193 } catch (PersistenceException error) {
194 _log.error(error, error);
195 }
196 if (exception instanceof JMSException) {
197 throw (JMSException) exception;
198 }
199 throw new JMSException(msg + ": " + exception.getMessage());
200 }
201 }
202
203 /***
204 * Register a listener for a specific destination, to be notified when
205 * messages for the destination arrive.
206 *
207 * @param destination the destination to register the listener for
208 * @param listener the listener to notify
209 */
210 public void addEventListener(JmsDestination destination,
211 MessageManagerEventListener listener) {
212 _listeners.put(destination, listener);
213 }
214
215 /***
216 * Remove the listener for the specified destination.
217 *
218 * @param destination the destination to remove the listener for
219 */
220 public void removeEventListener(JmsDestination destination) {
221 _listeners.remove(destination);
222 }
223
224 /***
225 * Start the service.
226 *
227 * @throws ServiceException if the service fails to start
228 */
229 protected void doStart() throws ServiceException {
230 if (_destinations == null) {
231 throw new ServiceException(
232 "Cannot start service: DestinationManager not initialised");
233 }
234 }
235
236 /***
237 * Processes a non-persistent message.
238 *
239 * @param message the message to add
240 * @throws JMSException if the message cannot be processed
241 */
242 private void addNonPersistentMessage(MessageImpl message)
243 throws JMSException {
244
245
246 JmsDestination destination
247 = (JmsDestination) message.getJMSDestination();
248
249 MessageManagerEventListener listener = getEventListener(destination);
250 listener.messageAdded(destination, message);
251 }
252
253 /***
254 * Stop the service.
255 *
256 * @throws ServiceException if the service fails to stop
257 */
258 protected void doStop() throws ServiceException {
259 _listeners.clear();
260 }
261
262 /***
263 * Add a persistent message.
264 *
265 * @param message the message to add
266 * @throws JMSException if the message cannot be processed
267 * @throws PersistenceException for any persistence error
268 */
269 private void addPersistentMessage(MessageImpl message)
270 throws JMSException, PersistenceException {
271 JmsDestination destination =
272 (JmsDestination) message.getJMSDestination();
273
274 Connection connection = _database.getConnection();
275
276
277 _database.getAdapter().addMessage(connection, message);
278
279
280 MessageManagerEventListener listener = getEventListener(destination);
281 listener.persistentMessageAdded(destination, message);
282 }
283
284 /***
285 * Returns the event listener for the specified destination.
286 * <p/>
287 * If no event listener is registered, it falls back to the destination
288 * manager.
289 *
290 * @param destination the destination
291 * @return the event listener registgered for <code>destination</code>, or
292 * the destination manager if no is registered
293 */
294 private MessageManagerEventListener getEventListener(
295 JmsDestination destination) {
296 MessageManagerEventListener listener =
297 (MessageManagerEventListener) _listeners.get(destination);
298
299 if (listener == null) {
300
301
302 listener = _destinations;
303 }
304 return listener;
305 }
306
307 /***
308 * Returns the next seed value to be allocated to a new message.
309 *
310 * @return a unique identifier for a message
311 */
312 private long getNextSequenceNumber() {
313 synchronized (_lock) {
314 return ++_sequenceNoSeed;
315 }
316 }
317
318 }