View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: BasicEventManager.java,v 1.4 2006/02/23 11:17:38 tanderson Exp $
44   */
45  package org.exolab.jms.events;
46  
47  import java.util.Comparator;
48  import java.util.HashMap;
49  
50  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
51  import org.exolab.jms.common.threads.ThreadPoolFactory;
52  import org.exolab.jms.common.util.OrderedQueue;
53  import org.exolab.jms.service.BasicService;
54  
55  
56  /***
57   * The EventManager manages {@link Event} objects. It has methods to register
58   * and unregister events. It also extends {@link Runnable} interface which
59   * defines the thread responsible for dispatching events.
60   * <p/>
61   * An event is defined to occur at sometime in the future, as specified either
62   * by an absolute time through {@link #registerEvent} or as relative time
63   * through {@link #registerEventRelative}. An event must have an associated
64   * event type and may have an attached <code>Serializable</code>, which is used
65   * when the EventManager makes a callback to the registered handler when the
66   * event fires.
67   * <p/>
68   * The register methids will return an event identifier which can subsequently
69   * be used to unregister the event through the {@link #unregisterEvent} event.
70   * This is the only means of unregister an event.
71   * <p/>
72   * If the {@link Event} object is incorrectly specified then the {@link
73   * IllegalEventDefinedException} exception is raised.
74   * <p/>
75   * When an event fires the {@link EventManager} is responsible for ensuring that
76   * the event handler is notified. If the event handler has since been removed
77   * then the EventManager must gracefully abort the delivery and continue
78   * processing the next event.
79   * <p/>
80   * Objects of type {@link Event} need to survive subsequent {@link EventManager}
81   * restarts, as such they must be persisted, which implies that the {@link
82   * EventHandler} needs to also be persisted. The ability to store the {@link
83   * EventHandler} as a <code>HandleIfc</code> object which can later be resolved
84   * to an object will be required.
85   *
86   * @author <a href="mailto:wood@intalio.com">Chris Wood</a>
87   * @version $Revision: 1.4 $ $Date: 2006/02/23 11:17:38 $
88   */
89  public class BasicEventManager
90          extends BasicService
91          implements EventManager {
92  
93      // The max number of threads for this pool.
94      private static final int MAX_THREADS = 5;
95  
96      /***
97       * Maps ids to events.
98       */
99      private final HashMap _events = new HashMap();
100 
101     /***
102      * Thread pool.
103      */
104     private PooledExecutor _pool;
105 
106     /***
107      * Synchonization for the following two collections.
108      */
109     private final Object _queueSync = new Object();
110 
111     /***
112      * Event queue.
113      */
114     private final OrderedQueue _queue = new OrderedQueue(_queueComparator);
115 
116     /***
117      * Used to generate unique queue entry ids.
118      */
119     private long _seed;
120 
121 
122     /***
123      * Construct a new <code>BasicEventManager</code>.
124      */
125     public BasicEventManager(ThreadPoolFactory factory) {
126         super("BasicEventManager");
127 
128         _pool = factory.create(getName(), MAX_THREADS);
129     }
130 
131     /***
132      * Register an event to be fired once and only once at the specified
133      * abolsute time. The event object must be Serializable so that it can be
134      * persisted and restored across EventManager restarts.
135      * <p/>
136      * If the specified event is ill-defined then the IllegalEventDefined-
137      * Exception exception is thrown.
138      * <p/>
139      * Similarly, if the abolsute time has already passed then the exception
140      * IllegalEventDefinedException is raised.
141      * <p/>
142      * The method returns an unique event identifier, which can subsequently be
143      * used to deregister the event.
144      *
145      * @param event    information about the event
146      * @param absolute the abolsute time, in ms, that the event must fire
147      * @return String  unique event identifier
148      * @throws IllegalEventDefinedException
149      */
150     public String registerEvent(Event event, long absolute)
151             throws IllegalEventDefinedException {
152         synchronized (_queueSync) {
153             QueueEntry entry = new QueueEntry(event, absolute, generateId());
154 
155             // add entry to the queue.
156             _queue.add(entry);
157             _events.put(entry.id, entry);
158 
159             // notify the event thread.
160             _queueSync.notifyAll();
161             return entry.id;
162         }
163     }
164 
165     /***
166      * Register an event to be fired once and only once at a time relative to
167      * now. The event object must be Serializable so that it can be persisted
168      * and restored across EventManager restarts.
169      * <p/>
170      * If the specified event is ill-defined then the IllegalEventDefined-
171      * Exception exception is thrown.
172      * <p/>
173      * The method returns an unique event identifier, which can subsequently be
174      * used to deregister the event.
175      *
176      * @param event    information about the event
177      * @param relative the relative time in ms (currently no reference to
178      *                 locale).
179      * @return String  unique event identifier,
180      * @throws IllegalEventDefinedException
181      */
182     public String registerEventRelative(Event event, long relative)
183             throws IllegalEventDefinedException {
184         return registerEvent(event, System.currentTimeMillis() + relative);
185     }
186 
187     /***
188      * Unregister the event specified by the event identifier. If the event does
189      * not exist then fail silently.
190      *
191      * @param id unique event identifier.
192      */
193     public void unregisterEvent(String id) {
194         synchronized (_queueSync) {
195             // remove from the events list
196             Object obj = _events.remove(id);
197             if (obj == null) {
198                 return;
199             }
200             // remove from the queue.
201             _queue.remove(obj);
202         }
203     }
204 
205     // implementation of BasicService.run
206     public void run() {
207         synchronized (_queueSync) {
208             QueueEntry entry;
209             long currentTime;
210             while (!Thread.interrupted()) {
211                 currentTime = System.currentTimeMillis();
212                 try {
213                     entry = (QueueEntry) _queue.firstElement();
214                 } catch (java.util.NoSuchElementException ex) {
215                     // queue is empty.
216                     try {
217                         _queueSync.wait();
218                     } catch (InterruptedException ex1) {
219                         break;
220                     }
221                     continue;
222                 }
223 
224                 if (entry.absolute <= currentTime) {
225                     // trigger any expired events
226                     try {
227                         _pool.execute(entry);
228                     } catch (InterruptedException ex) {
229                         break;
230                     }
231                     _queue.removeFirstElement();
232                     _events.remove(entry.id);
233                 } else {
234                     // wait for either the next event to expire or an element to be
235                     // added to the queue.
236                     try {
237                         _queueSync.wait(entry.absolute - currentTime);
238                     } catch (InterruptedException ex) {
239                         break;
240                     }
241                 }
242             }
243         }
244     }
245 
246     /***
247      * Generate unique queued object identifier.
248      */
249     private synchronized String generateId() {
250         return Long.toString(++_seed);
251     }
252 
253     /***
254      * Compare queue entries on expiration times
255      */
256     private static final Comparator _queueComparator =
257             new Comparator() {
258 
259                 public int compare(Object obj1, Object obj2) {
260                     QueueEntry qe1 = (QueueEntry) obj1;
261                     QueueEntry qe2 = (QueueEntry) obj2;
262 
263                     if (qe1.absolute < qe2.absolute) {
264                         return -1;
265                     }
266                     if (qe1.absolute > qe2.absolute) {
267                         return 1;
268                     }
269                     return 0;
270                 }
271 
272                 public boolean equals(Object that) {
273                     return (this == that);
274                 }
275             };
276 
277     /***
278      * Entry on the task queue.
279      */
280     class QueueEntry implements Runnable {
281 
282         QueueEntry(Event event, long absolute, String id) {
283             this.absolute = absolute;
284             this.event = event;
285             this.id = id;
286         }
287 
288         private long absolute;
289         private Event event;
290         private String id;
291 
292         public void run() {
293             event.getEventListener().handleEvent(event.getEventType(),
294                     event.getCallbackObject(),
295                     System.currentTimeMillis());
296         }
297     }
298 
299 }