1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.persistence;
44
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.PreparedStatement;
51 import java.sql.ResultSet;
52 import java.sql.SQLException;
53 import java.util.HashMap;
54 import java.util.Vector;
55
56 import javax.jms.JMSException;
57
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60
61 import org.exolab.jms.client.JmsDestination;
62 import org.exolab.jms.client.JmsTopic;
63 import org.exolab.jms.message.MessageImpl;
64 import org.exolab.jms.messagemgr.PersistentMessageHandle;
65
66
67 /***
68 * This class manages the persistence of message objects.
69 *
70 * @version $Revision: 1.4 $ $Date: 2005/08/31 05:45:50 $
71 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
72 */
73 class Messages {
74
75 /***
76 * The destination manager.
77 */
78 private final Destinations _destinations;
79
80 /***
81 * The logger.
82 */
83 private static final Log _log = LogFactory.getLog(Messages.class);
84
85
86 /***
87 * Construct a new <code>Messages</code>.
88 *
89 * @param destinations the destinations manager
90 */
91 public Messages(Destinations destinations) {
92 _destinations = destinations;
93 }
94
95 /***
96 * Add a message to the database, in the context of the specified
97 * transaction and connection.
98 *
99 * @param connection - execute on this connection
100 * @param message - the message to add
101 * @throws PersistenceException - an sql related error
102 */
103 public void add(Connection connection, MessageImpl message)
104 throws PersistenceException {
105
106 PreparedStatement insert = null;
107
108
109 String messageId = message.getMessageId().getId();
110
111
112
113 String name;
114 try {
115 name = ((JmsDestination) message.getJMSDestination()).getName();
116 } catch (JMSException exception) {
117 throw new PersistenceException(
118 "Failed to get destination for message=" +
119 message.getMessageId(), exception);
120 }
121
122 long destinationId = _destinations.getId(name);
123 if (destinationId == 0) {
124 throw new PersistenceException(
125 "Cannot add message=" + message.getMessageId() +
126 ", destination=" + name + " (" + destinationId +
127 "): destination does not exist");
128 }
129
130 try {
131
132 insert = connection.prepareStatement(
133 "insert into messages (messageid, destinationid, priority, "
134 + "createtime, expirytime, processed, messageblob) values "
135 + "(?,?,?,?,?,?,?)");
136 insert.setString(1, messageId);
137 insert.setLong(2, destinationId);
138 insert.setInt(3, message.getJMSPriority());
139 insert.setLong(4, message.getAcceptedTime());
140 insert.setLong(5, message.getJMSExpiration());
141 insert.setInt(6, (message.getProcessed()) ? 1 : 0);
142
143
144 byte[] bytes = serialize(message);
145 insert.setBinaryStream(7, new ByteArrayInputStream(bytes),
146 bytes.length);
147
148
149
150 if (insert.executeUpdate() != 1) {
151 throw new PersistenceException(
152 "Failed to add message=" + message.getMessageId() +
153 ", destination=" + name + " (" + destinationId + ")");
154 }
155 } catch (PersistenceException exception) {
156 throw exception;
157 } catch (Exception exception) {
158 throw new PersistenceException(
159 "Failed to add message=" + message.getMessageId() +
160 ", destination=" + name + " (" + destinationId + ")",
161 exception);
162 } finally {
163 SQLHelper.close(insert);
164 }
165 }
166
167 /***
168 * Update the message state in the database. This will be called to set
169 * the message state to processed by the provider
170 *
171 * @param connection - execute on this connection
172 * @param message - the message to update
173 * @throws PersistenceException - an sql related error
174 */
175 public void update(Connection connection, MessageImpl message)
176 throws PersistenceException {
177
178 PreparedStatement update = null;
179
180
181 String messageId = message.getMessageId().getId();
182
183 try {
184 update = connection.prepareStatement(
185 "update messages set processed=? where messageId=?");
186 update.setInt(1, message.getProcessed() ? 1 : 0);
187 update.setString(2, messageId);
188
189
190 if (update.executeUpdate() != 1) {
191 _log.error("Cannot update message=" + messageId);
192 }
193 } catch (SQLException exception) {
194 throw new PersistenceException(
195 "Failed to update message, id=" + messageId, exception);
196 } finally {
197 SQLHelper.close(update);
198 }
199 }
200
201 /***
202 * Remove a message with the specified identity from the database
203 *
204 * @param connection - execute on this connection
205 * @param messageId - the message id of the message to remove
206 * @throws PersistenceException - an sql related error
207 */
208 public void remove(Connection connection, String messageId)
209 throws PersistenceException {
210
211 PreparedStatement delete = null;
212 try {
213 delete = connection.prepareStatement(
214 "delete from messages where messageId=?");
215 delete.setString(1, messageId);
216
217
218 if (delete.executeUpdate() != 1) {
219 _log.error("Cannot remove message=" + messageId);
220 }
221 } catch (SQLException exception) {
222 throw new PersistenceException(
223 "Failed to remove message, id=" + messageId, exception);
224 } finally {
225 SQLHelper.close(delete);
226 }
227 }
228
229 /***
230 * Return the message identified by the message Id
231 *
232 * @param connection - execute on this connection
233 * @param messageId - id of message to retrieve
234 * @return MessageImpl - the associated message
235 * @throws PersistenceException - an sql related error
236 */
237 public MessageImpl get(Connection connection, String messageId)
238 throws PersistenceException {
239
240 MessageImpl result = null;
241 PreparedStatement select = null;
242 ResultSet set = null;
243 try {
244 select = connection.prepareStatement(
245 "select messageBlob, processed from messages where messageId=?");
246
247 select.setString(1, messageId);
248 set = select.executeQuery();
249 if (set.next()) {
250 result = deserialize(set.getBytes(1));
251 result.setProcessed((set.getInt(2) == 1 ? true : false));
252 }
253 } catch (SQLException exception) {
254 throw new PersistenceException(
255 "Failed to retrieve message, id=" + messageId, exception);
256 } finally {
257 SQLHelper.close(set);
258 SQLHelper.close(select);
259 }
260
261 return result;
262 }
263
264 /***
265 * Delete all messages for the given destination
266 *
267 * @param connection - execute on this connection
268 * @param destination the destination to remove messages for
269 * @return int - the number of messages purged
270 * @throws PersistenceException - an sql related error
271 */
272 public int removeMessages(Connection connection, String destination)
273 throws PersistenceException {
274
275 int result = 0;
276 PreparedStatement delete = null;
277
278
279 long destinationId = _destinations.getId(destination);
280 if (destinationId == 0) {
281 throw new PersistenceException("Cannot delete messages for " +
282 "destination=" + destination +
283 ": destination does not exist");
284 }
285
286 try {
287 delete = connection.prepareStatement(
288 "delete from messages where destinationId = ?");
289 delete.setLong(1, destinationId);
290 result = delete.executeUpdate();
291 } catch (SQLException exception) {
292 throw new PersistenceException(
293 "Failed to remove messages for destination=" + destination,
294 exception);
295 } finally {
296 SQLHelper.close(delete);
297 }
298
299 return result;
300 }
301
302 /***
303 * Retrieve the next set of messages for the specified destination with
304 * an acceptance time greater or equal to that specified. It will retrieve
305 * around 200 or so messages depending on what is available.
306 *
307 * @param connection - execute on this connection
308 * @param destination - the destination
309 * @param priority - the priority of the messages
310 * @param time - with timestamp greater or equal to this
311 * @return Vector - one or more MessageImpl objects
312 * @throws PersistenceException - if an SQL error occurs
313 */
314 public Vector getMessages(Connection connection, String destination,
315 int priority, long time)
316 throws PersistenceException {
317
318 PreparedStatement select = null;
319 ResultSet set = null;
320 Vector messages = new Vector();
321
322 try {
323 JmsDestination dest = _destinations.get(destination);
324 if (dest == null) {
325 throw new PersistenceException(
326 "Cannot getMessages for destination=" + destination
327 + ": destination does not exist");
328 }
329
330 long destinationId = _destinations.getId(destination);
331 if (destinationId == 0) {
332 throw new PersistenceException(
333 "Cannot getMessages for destination=" + destination
334 + ": destination does not exist");
335 }
336
337 if ((dest instanceof JmsTopic) &&
338 (((JmsTopic) dest).isWildCard())) {
339
340
341
342 select = connection.prepareStatement(
343 "select createtime,processed,messageblob from messages "
344 + "where priority=? and createTime>=? "
345 + "order by createTime asc");
346 select.setInt(1, priority);
347 select.setLong(2, time);
348 } else {
349
350
351
352 select = connection.prepareStatement(
353 "select createtime,processed,messageblob from messages "
354 + "where destinationId=? and priority=? and createTime>=? "
355 + "order by createTime asc");
356 select.setLong(1, destinationId);
357 select.setInt(2, priority);
358 select.setLong(3, time);
359 }
360 set = select.executeQuery();
361
362
363 int count = 0;
364 long lastTimeStamp = time;
365 while (set.next()) {
366 MessageImpl m = deserialize(set.getBytes(3));
367 m.setProcessed((set.getInt(2) == 1 ? true : false));
368 messages.add(m);
369 if (++count > 200) {
370
371
372
373 if (set.getLong(1) > lastTimeStamp) {
374 break;
375 }
376 } else {
377 lastTimeStamp = set.getLong(1);
378 }
379 }
380 } catch (SQLException exception) {
381 throw new PersistenceException(
382 "Failed to retrieve messages", exception);
383 } finally {
384 SQLHelper.close(set);
385 SQLHelper.close(select);
386 }
387
388 return messages;
389 }
390
391 /***
392 * Retrieve the specified number of message ids from the database with a
393 * time greater than that specified. The number of items to retrieve
394 * is only a hint and does not reflect the number of messages actually
395 * returned.
396 *
397 * @param connection - execute on this connection
398 * @param time - with timestamp greater than
399 * @param hint - an indication of the number of messages to return.
400 * @return a map of messageId Strings to their creation time
401 * @throws PersistenceException - if an SQL error occurs
402 */
403 public HashMap getMessageIds(Connection connection, long time, int hint)
404 throws PersistenceException {
405
406 PreparedStatement select = null;
407 ResultSet set = null;
408 HashMap messages = new HashMap();
409
410 try {
411 select = connection.prepareStatement(
412 "select messageId,createTime from messages where createTime>? "
413 + "order by createTime asc");
414 select.setLong(1, time);
415 set = select.executeQuery();
416
417
418 int count = 0;
419 long lastTimeStamp = time;
420 while (set.next()) {
421 messages.put(set.getString(1), new Long(set.getLong(2)));
422 if (++count > hint) {
423 if (set.getLong(2) > lastTimeStamp) {
424 break;
425 }
426 } else {
427 lastTimeStamp = set.getLong("createTime");
428 }
429
430 }
431 } catch (SQLException exception) {
432 throw new PersistenceException(
433 "Failed to retrieve message identifiers", exception);
434 } finally {
435 SQLHelper.close(set);
436 SQLHelper.close(select);
437 }
438
439 return messages;
440 }
441
442 /***
443 * Retrieve a list of unprocessed messages and return them to the client.
444 * An unprocessed message has been accepted by the system but not
445 * processed.
446 *
447 * @param connection - execute on this connection
448 * @return Vector - one or more MessageImpl objects
449 * @throws PersistenceException - if an SQL error occurs
450 */
451 public Vector getUnprocessedMessages(Connection connection)
452 throws PersistenceException {
453
454 PreparedStatement select = null;
455 ResultSet set = null;
456 Vector messages = new Vector();
457
458 try {
459 select = connection.prepareStatement(
460 "select messageblob from messages where processed=0");
461 set = select.executeQuery();
462
463 while (set.next()) {
464 MessageImpl m = deserialize(set.getBytes(1));
465 m.setProcessed(false);
466 messages.add(m);
467 }
468 } catch (SQLException exception) {
469 throw new PersistenceException(
470 "Failed to retrieve unprocessed messages", exception);
471 } finally {
472 SQLHelper.close(set);
473 SQLHelper.close(select);
474 }
475
476 return messages;
477 }
478
479 /***
480 * Retrieve the message handle for all unexpired messages
481 *
482 * @param connection - execute on this connection
483 * @param destination - the destination in question
484 * @return Vector - collection of PersistentMessageHandle objects
485 * @throws PersistenceException - sql releated exception
486 */
487 public Vector getNonExpiredMessages(Connection connection,
488 JmsDestination destination)
489 throws PersistenceException {
490
491 Vector result = new Vector();
492 PreparedStatement select = null;
493 ResultSet set = null;
494
495 try {
496 long destinationId = _destinations.getId(destination.getName());
497
498 if (destinationId == 0) {
499 throw new PersistenceException(
500 "Cannot getMessages for destination=" + destination
501 + ": destination does not exist");
502 }
503
504 select = connection.prepareStatement(
505 "select messageId,destinationId,priority,createTime,"
506 + "sequenceNumber,expiryTime "
507 + "from messages "
508 + "where expiryTime>0 and destinationId=? "
509 + "order by expiryTime asc");
510 select.setLong(1, destinationId);
511 set = select.executeQuery();
512
513 while (set.next()) {
514 String messageId = set.getString(1);
515 int priority = set.getInt(3);
516 long acceptedTime = set.getLong(4);
517 long sequenceNumber = set.getLong(5);
518 long expiryTime = set.getLong(6);
519 PersistentMessageHandle handle = new PersistentMessageHandle(
520 messageId, priority, acceptedTime, sequenceNumber,
521 expiryTime, destination);
522 result.add(handle);
523 }
524 } catch (SQLException exception) {
525 throw new PersistenceException(
526 "Failed to retrieve non-expired messages", exception);
527 } finally {
528 SQLHelper.close(set);
529 SQLHelper.close(select);
530 }
531
532 return result;
533 }
534
535 /***
536 * Delete all expired messages and associated message handles.
537 *
538 * @param connection - execute on this connection
539 * @throws PersistenceException - if an SQL error occurs
540 */
541 public void removeExpiredMessages(Connection connection)
542 throws PersistenceException {
543
544 PreparedStatement delete = null;
545 try {
546 long time = System.currentTimeMillis();
547
548
549 delete = connection.prepareStatement(
550 "delete from messages where expiryTime > 0 and expiryTime < ?");
551 delete.setLong(1, time);
552 delete.executeUpdate();
553 delete.close();
554
555
556 delete = connection.prepareStatement(
557 "delete from message_handles where expiryTime > 0 and expiryTime < ?");
558 delete.setLong(1, time);
559 delete.executeUpdate();
560 } catch (SQLException exception) {
561 throw new PersistenceException(
562 "Failed to remove expired messages", exception);
563 } finally {
564 SQLHelper.close(delete);
565 }
566 }
567
568 /***
569 * Get the message as a serialized blob
570 *
571 * @param message the message to serialize
572 * @return byte[] the serialized message
573 */
574 public byte[] serialize(MessageImpl message)
575 throws PersistenceException {
576
577 byte[] result = null;
578 ObjectOutputStream ostream = null;
579 try {
580 ByteArrayOutputStream bstream = new ByteArrayOutputStream();
581 ostream = new ObjectOutputStream(bstream);
582 ostream.writeObject(message);
583 result = bstream.toByteArray();
584 } catch (Exception exception) {
585 throw new PersistenceException("Failed to serialize message",
586 exception);
587 } finally {
588 SQLHelper.close(ostream);
589 }
590
591 return result;
592 }
593
594 /***
595 * Set the message from a serialized blob
596 *
597 * @param blob the serialized message
598 * @return the re-constructed message
599 */
600 public MessageImpl deserialize(byte[] blob) throws PersistenceException {
601 MessageImpl message = null;
602
603 if (blob != null) {
604 ObjectInputStream istream = null;
605 try {
606 ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
607 istream = new ObjectInputStream(bstream);
608 message = (MessageImpl) istream.readObject();
609 } catch (Exception exception) {
610 throw new PersistenceException(
611 "Failed to de-serialize message", exception);
612 } finally {
613 SQLHelper.close(istream);
614 }
615 } else {
616 throw new PersistenceException(
617 "Cannot de-serialize null message blob");
618 }
619
620 return message;
621 }
622
623 }