View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  
44  package org.exolab.jms.scheduler;
45  
46  import java.util.HashMap;
47  import java.util.LinkedList;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  
52  import org.exolab.core.service.BasicService;
53  import org.exolab.core.service.ServiceException;
54  import org.exolab.core.threadPool.ThreadPool;
55  import org.exolab.jms.config.Configuration;
56  import org.exolab.jms.config.ConfigurationManager;
57  import org.exolab.jms.config.SchedulerConfiguration;
58  import org.exolab.jms.threads.ThreadPoolManager;
59  
60  
61  /***
62   * The scheduler is responsible for executing {@link Runnable} objects
63   * using a thread pool. Clients can add these objects to the scheduler
64   * and the scheduler will, in fifo order, execute them. If there are no
65   * threads currently available, the runnable will wait for one to become
66   * available.
67   * <p>
68   * A client can add or remove {@link Runnable} objects.
69   *
70   * @version     $Revision: 1.9 $ $Date: 2003/08/17 01:32:25 $
71   * @author      <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
72   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73   * @author      <a href="mailto:jima@intalio.com">Jim Alateras</a>
74   * @see         java.lang.Runnable
75   * @see         org.exolab.core.service.BasicService
76   * @see         org.exolab.core.threadPool.ThreadPool
77   */
78  public class Scheduler extends BasicService {
79  
80      /***
81       * This is the thread pool used by the scheduler
82       */
83      private ThreadPool _threads = null;
84  
85      /***
86       * The queue of Runnable instances
87       */
88      private LinkedList _queue = new LinkedList();;
89  
90      /***
91       * HashMap of Runnable->Integer, representing the number of times a
92       * Runnable object exists in the queue
93       */
94      private HashMap _references = new HashMap();
95  
96      /***
97       * If true, shuts down the scheduler
98       */
99      private volatile boolean _stop = false;
100 
101     /***
102      * This attribute holds the number of threads that the scheduler
103      * will use in its pool. It defaults to 6 and has a minimum value
104      * of 2
105      */
106     private int _threadCount = 6;
107 
108     /***
109      * This is the minimum number of threads that can be used to
110      * configure the scheduler. If a lower nmber is specified then
111      * it defaults to this value
112      */
113     private final static int MIN_THREAD_COUNT = 2;
114 
115     /***
116      * Unique name identifyting this sevice
117      */
118     private static final String SCHEDULER_NAME = "Scheduler";
119 
120     /***
121      * This is the singleton instance of this class
122      */
123     private static Scheduler _instance = null;
124 
125     /***
126      * The logger
127      */
128     private static final Log _log = LogFactory.getLog(Scheduler.class);
129 
130     /***
131      * Creates the singleton instance
132      *
133      * @throws ServiceException if the scheduler can't be created
134      */
135     public static Scheduler createInstance() throws ServiceException {
136         _instance = new Scheduler();
137         return _instance;
138     }
139 
140     /***
141      * Returns the singleton instance
142      *
143      * @return the singleton instance, or <code>null</code> if it hasn't
144      * been initialised
145      */
146     public static Scheduler instance() {
147         return _instance;
148     }
149 
150     /***
151      * Construct an instance of the scheduler. This will read the information
152      * from the configuration file and then set up a thread pool.
153      *
154      * @throws ServiceException if the thread pool can't be initialised
155      */
156     private Scheduler() throws ServiceException {
157         super(SCHEDULER_NAME);
158 
159         // access the configuration file.
160         Configuration config = ConfigurationManager.getConfig();
161         SchedulerConfiguration sched_config =
162             config.getSchedulerConfiguration();
163 
164         int count = sched_config.getMaxThreads();
165         if (count < MIN_THREAD_COUNT) {
166             count = MIN_THREAD_COUNT;
167         }
168         _threadCount = count;
169 
170         // create the thread pool
171         _threads = ThreadPoolManager.instance().createThreadPool(
172             SCHEDULER_NAME, _threadCount);
173     }
174 
175     /***
176      * Add a Runnable object to the scheduler queue.
177      * When a thread becomes available, it will be executed.
178      *
179      * @param       runner              the object to execute
180      */
181     public void add(Runnable runner) {
182         synchronized (_queue) {
183             _queue.addLast(runner);
184             addReference(runner);
185             _queue.notify();
186         }
187     }
188 
189     /***
190      * Remove a Runnable object from the scheduler queue.
191      *
192      * @param       runner              the object to remove
193      * @return      boolean             <tt>true</tt> if the object was
194      *                                  removed, <tt>false</tt> if it is
195      *                                  already running or doesn't exist
196      */
197     public boolean remove(Runnable runner) {
198         boolean result = false;
199         synchronized (_queue) {
200             result = _queue.remove(runner);
201         }
202         return result;
203     }
204 
205     /***
206      * Returns if a Runnable object exists in the scheduler queue.
207      *
208      * @param       runner              the object to remove
209      * @return      boolean             <tt>true</tt> if the object exists,
210      *                                  <tt>false</tt> if it is already
211      *                                  running or doesn't exist
212      */
213     public boolean contains(Runnable runner) {
214         boolean result = false;
215         synchronized (_queue) {
216             result = (_references.get(runner) != null);
217         }
218         return result;
219     }
220 
221     /***
222      * Returns true if the scheduler queue is empty
223      *
224      * @return <tt>true</tt> if the scheduler queue is empty
225      */
226     public boolean isEmpty() {
227         boolean result = false;
228         synchronized (_queue) {
229             result = _queue.isEmpty();
230         }
231         return result;
232     }
233 
234     /***
235      * Start the scheduler
236      * This can only be terminated by invoking {@link #stop}
237      */
238     public void run() {
239         while (!_stop) {
240             Runnable runner = next();
241             if (!_stop && runner != null) {
242                 try {
243                     _threads.execute(runner);
244                 } catch (Exception exception) {
245                     _log.error(exception);
246                 }
247             }
248         }
249     }
250 
251     // override BasicService.stop
252     public void stop() throws ServiceException {
253         // TODO - need a safer way of shutting down threads.
254         _threads.stopRequestAllWorkers();
255         _stop = true;
256         super.stop();
257     }
258 
259     /***
260      * Return the next object in the queue to execute
261      * This method blocks until an object becomes available.
262      *
263      * @return      Runnable            the next object to execute
264      */
265     protected Runnable next() {
266         Runnable result = null;
267         synchronized (_queue) {
268             while (!_stop && _queue.isEmpty()) {
269                 try {
270                     _queue.wait();
271                 } catch (InterruptedException ignore) {
272                     // do nothing.
273                 }
274             }
275             if (!_stop) {
276                 result = (Runnable) _queue.removeFirst();
277                 removeReference(result);
278             }
279         }
280         return result;
281     }
282 
283     /***
284      * Increment the reference count to a queued Runnable object,
285      * to enable contains() to be as efficient as possible.
286      */
287     private void addReference(Runnable runner) {
288         Integer count = (Integer) _references.get(runner);
289         if (count != null) {
290             count = new Integer(count.intValue() + 1);
291             _references.put(runner, count);
292         } else {
293             _references.put(runner, new Integer(1));
294         }
295     }
296 
297     /***
298      * Decrement the reference count to a queued Runnable object,
299      * removing it if no more references remain.
300      */
301     private void removeReference(Runnable runner) {
302         // decrement the no. of references to the Runnable object,
303         // removing it when the count reaches 0
304         Integer count = (Integer) _references.get(runner);
305         if (count.intValue() <= 1) {
306             _references.remove(runner);
307         } else {
308             _references.put(runner, new Integer(count.intValue() - 1));
309         }
310     }
311 
312 } //-- Scheduler