View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: BasicEventManager.java,v 1.8 2003/08/17 01:32:22 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 07/27/00	    jima    Created
47   */
48  package org.exolab.jms.events;
49  
50  import java.io.IOException;
51  import java.io.Serializable;
52  import java.util.Comparator;
53  import java.util.HashMap;
54  import java.util.Iterator;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  
59  import org.exolab.core.service.BasicService;
60  import org.exolab.core.service.ServiceException;
61  import org.exolab.core.service.ServiceState;
62  import org.exolab.core.threadPool.ThreadPool;
63  import org.exolab.core.util.OrderedQueue;
64  import org.exolab.jms.threads.ThreadPoolExistsException;
65  import org.exolab.jms.threads.ThreadPoolManager;
66  
67  
68  /***
69   * The EventManager manages {@link Event} objects. It has methods to
70   * register and unregister events. It also extends {@link Runnable} interface
71   * which defines the thread responsible for dispatching events.
72   * <p>
73   * An event is defined to occur at sometime in the future, as specified either
74   * by an absolute time through {@link #registerEvent} or as relative time
75   * through {@link #registerEventRelative}. An event must have an associated
76   * event type and may have an attached <code>Serializable</code>,
77   * which is used when the EventManager makes a callback to the registered
78   * handler when the event fires.
79   * <p>
80   * The register methids will return an event identifier which can subsequently
81   * be used to unregister the event through the {@link #unregisterEvent} event.
82   * This is the only means of unregister an event.
83   * <p>
84   * If the {@link Event} object is incorrectly specified then the
85   * {@link IllegalEventDefinedException} exception is raised.
86   * <p>
87   * When an event fires the {@link EventManager} is responsible for ensuring
88   * that the event handler is notified. If the event handler has since been
89   * removed then the EventManager must gracefully abort the delivery and
90   * continue processing the next event.
91   * <p>
92   * Objects of type {@link Event} need to survive subsequent
93   * {@link EventManager} restarts, as such they must be persisted, which
94   * implies that the {@link EventHandler} needs to also be persisted. The
95   * ability to store the {@link EventHandler} as a <code>HandleIfc</code> object
96   * which can later be resolved to an object will be required.
97   *
98   * @version   $Revision: 1.8 $ $Date: 2003/08/17 01:32:22 $
99   * @author    <a href="mailto:wood@intalio.com">Chris Wood</a>
100  */
101 public class BasicEventManager
102     extends BasicService
103     implements EventManager {
104 
105     // The unique name of this ThreadPool.
106     public transient static final String NAME = "EventManager";
107 
108     // The max number of threads for this pool.
109     public transient static final int MAX_THREADS = 5;
110 
111     // serialized state.
112 
113     /***
114      * Maps ids to events.
115      */
116     private HashMap _events = new HashMap();
117 
118     // nonserialized state.
119 
120     /***
121      * Thread pool manager.
122      */
123     private transient ThreadPool _pool;
124 
125     /***
126      * Synchonization for the following two collections.
127      */
128     private transient Object _queueSync = new Object();
129 
130     /***
131      * Event queue.
132      */
133     private transient OrderedQueue _queue = new OrderedQueue(_queueComparator);
134 
135     /***
136      * Used to generate unique queue entry ids.
137      */
138     private transient long _seed;
139 
140     /***
141      * this is the name of the EventManagerThread in which events excecute.
142      */
143     transient final static private String EVENT_MANAGER_THREAD_NAME = "EventManagerThread";
144 
145     /***
146      * Singleton instance.
147      */
148     transient static private EventManager _instance = null;
149 
150     /***
151      * The logger
152      */
153     private static final Log _log = LogFactory.getLog(BasicEventManager.class);
154 
155 
156     /***
157      * Return the singleton instance of the EventManager
158      *
159      * @return    EventManager
160      */
161     public static EventManager instance() {
162         if (_instance == null)
163             _instance = new BasicEventManager();
164 
165         return _instance;
166     }
167 
168     protected BasicEventManager() {
169         super(EVENT_MANAGER_THREAD_NAME);
170     }
171 
172     /***
173      * Register an event to be fired once and only once at the specified
174      * abolsute time. The event object must be Serializable so that it can
175      * be persisted and restored across EventManager restarts.
176      * <p>
177      * If the specified event is ill-defined then the IllegalEventDefined-
178      * Exception exception is thrown.
179      * <p>
180      * Similarly, if the abolsute time has already passed then the exception
181      * IllegalEventDefinedException is raised.
182      * <p>
183      * The method returns an unique event identifier, which can subsequently
184      * be used to deregister the event.
185      *
186      * @param event    information about the event
187      * @param abolsute the abolsute time, in ms, that the event
188      *                 must fire
189      * @return String  unique event identifier
190      * @exception IllegalEventDefinedException
191      */
192     public String registerEvent(Event event, long absolute)
193         throws IllegalEventDefinedException {
194         synchronized (_queueSync) {
195             QueueEntry entry = new QueueEntry(event, absolute, generateId());
196 
197             // add entry to the queue.
198             _queue.add(entry);
199             _events.put(entry.id, entry);
200 
201             // notify the event thread.
202             _queueSync.notifyAll();
203             return entry.id;
204         }
205     }
206 
207     /***
208      * Register an event to be fired once and only once at a time relative to
209      * now. The event object must be Serializable so that it can be persisted
210      * and restored across EventManager restarts.
211      * <p>
212      * If the specified event is ill-defined then the IllegalEventDefined-
213      * Exception exception is thrown.
214      * <p>
215      * The method returns an unique event identifier, which can subsequently
216      * be used to deregister the event.
217      *
218      * @param event    information about the event
219      * @param relative the relative time in ms
220      *                 (currently no reference to locale).
221      * @return String  unique event identifier,
222      * @exception IllegalEventDefinedException
223      */
224     public String registerEventRelative(Event event, long relative)
225         throws IllegalEventDefinedException {
226         return registerEvent(event, System.currentTimeMillis() + relative);
227     }
228 
229     /***
230      * Unregister the event specified by the event identifier. If the event
231      * does not exist then fail silently.
232      *
233      * @param String unique event identifier.
234      */
235     public void unregisterEvent(String id) {
236         synchronized (_queueSync) {
237             // remove from the events list
238             Object obj = _events.remove(id);
239             if (obj == null)
240                 return;
241             // remove from the queue.
242             _queue.remove(obj);
243         }
244     }
245 
246     // implementation of BasicService.run
247     public void run() {
248         synchronized (_queueSync) {
249             QueueEntry entry;
250             long currentTime;
251             while (getState() != ServiceState.STOPPED) {
252                 currentTime = System.currentTimeMillis();
253                 try {
254                     entry = (QueueEntry) _queue.firstElement();
255                 } catch (java.util.NoSuchElementException ex) {
256                     // queue is empty.
257                     try {
258                         _queueSync.wait();
259                     } catch (InterruptedException ex1) {
260                         break;
261                     }
262                     continue;
263                 }
264 
265                 if (entry.absolute <= currentTime) {
266                     // trigger any expired events
267                     try {
268                         getThreadPool().execute(entry);
269                     } catch (InterruptedException ex) {
270                     }
271                     _queue.removeFirstElement();
272                     _events.remove(entry.id);
273                 } else {
274                     // wait for either the next event to expire or an element to be
275                     // added to the queue.
276                     try {
277                         _queueSync.wait(entry.absolute - currentTime);
278                     } catch (InterruptedException ex) {
279                         // ignore
280                     }
281                 }
282             }
283         }
284     }
285 
286     /***
287      * Generate unique queued object identifier.
288      */
289     private synchronized String generateId() {
290         return Long.toString(++_seed);
291     }
292 
293     /***
294      * Add events back to the event queue.
295      */
296     private void readObject(java.io.ObjectInputStream in)
297         throws IOException, ClassNotFoundException {
298         synchronized (_queueSync) {
299             in.defaultReadObject();
300 
301             // fillin the queue
302             Iterator itt = _events.values().iterator();
303             while (itt.hasNext())
304                 _queue.add(itt.next());
305         }
306     }
307 
308     private void writeObject(java.io.ObjectOutputStream out)
309         throws IOException {
310         synchronized (_queueSync) {
311             // pause the queue thread while we serialize
312             out.defaultWriteObject();
313         }
314     }
315 
316     public void start() throws ServiceException {
317         super.start();
318     }
319 
320     /***
321      * Return a reference ot the thread pool manager. This object is chached
322      * for future reference
323      *
324      * @return      ThreadPool
325      */
326     private ThreadPool getThreadPool() {
327         if (_pool == null) {
328             // At startup Event Mgr is triggered before the Service
329             // locator has registered the ThreadPoolMgr, causing
330             // an exception. Use the instance variable for now to
331             // avoid this problem. jimm
332             // _pool = (ThreadPoolMgr)ServiceLocator.locateService(
333             //    ServiceConstants.ThreadPoolManager);
334             try {
335                 _pool = ThreadPoolManager.instance().createThreadPool
336                     (NAME, MAX_THREADS);
337             } catch (ThreadPoolExistsException err) {
338                 _log.error("Thread pool " + NAME + " already exists");
339             }
340         }
341 
342         return _pool;
343     }
344 
345     /***
346      * Compare queue entries on expiration times
347      */
348     private transient static final Comparator _queueComparator =
349         new Comparator() {
350 
351             public int compare(Object obj1, Object obj2) {
352                 QueueEntry qe1 = (QueueEntry) obj1;
353                 QueueEntry qe2 = (QueueEntry) obj2;
354 
355                 if (qe1.absolute < qe2.absolute)
356                     return -1;
357                 if (qe1.absolute > qe2.absolute)
358                     return 1;
359                 return 0;
360             }
361 
362             public boolean equals(Object that) {
363                 return (this == that);
364             }
365         };
366 
367     /***
368      * Entry on the task queue.
369      */
370     class QueueEntry implements Serializable, Runnable {
371 
372         QueueEntry(Event event, long absolute, String id) {
373             this.absolute = absolute;
374             this.event = event;
375             this.id = id;
376         }
377 
378         private long absolute;
379         private Event event;
380         private String id;
381 
382         public void run() {
383             event.getEventListener().handleEvent(event.getEventType(),
384                 event.getCallbackObject(), System.currentTimeMillis());
385         }
386     }
387 
388 } //-- BasicEventManager