View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.server;
44  
45  import java.sql.Connection;
46  import java.util.Enumeration;
47  import java.util.Vector;
48  import javax.jms.JMSException;
49  
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  
53  import org.exolab.jms.authentication.AuthenticationMgr;
54  import org.exolab.jms.authentication.User;
55  import org.exolab.jms.client.JmsDestination;
56  import org.exolab.jms.client.JmsQueue;
57  import org.exolab.jms.client.JmsTopic;
58  import org.exolab.jms.config.Configuration;
59  import org.exolab.jms.config.Connector;
60  import org.exolab.jms.config.types.SchemeType;
61  import org.exolab.jms.messagemgr.ConsumerEndpoint;
62  import org.exolab.jms.messagemgr.ConsumerManager;
63  import org.exolab.jms.messagemgr.DestinationCache;
64  import org.exolab.jms.messagemgr.DestinationManager;
65  import org.exolab.jms.messagemgr.DurableConsumerEndpoint;
66  import org.exolab.jms.persistence.DatabaseService;
67  import org.exolab.jms.persistence.PersistenceException;
68  import org.exolab.jms.service.ServiceException;
69  import org.exolab.jms.service.Services;
70  
71  
72  /***
73   * A connection is created for every adminclient connecting to the JmsServer.
74   *
75   * @author <a href="mailto:knut@lerpold.no">Knut Lerpold</a>
76   * @version $Revision: 1.7 $ $Date: 2005/12/23 12:17:45 $
77   * @see org.exolab.jms.server.AdminConnectionManager
78   */
79  public class AdminConnection {
80  
81      /***
82       * The configuration.
83       */
84      private final Configuration _config;
85  
86      /***
87       * The authentication manager.
88       */
89      private final AuthenticationMgr _authenticator;
90  
91      /***
92       * The destination manager.
93       */
94      private final DestinationManager _destinations;
95  
96      /***
97       * The consumer manager.
98       */
99      private final ConsumerManager _consumers;
100 
101     /***
102      * The database service.
103      */
104     private final DatabaseService _database;
105 
106     /***
107      * The services.
108      */
109     private final Services _services;
110 
111     /***
112      * The logger
113      */
114     private static final Log _log = LogFactory.getLog(AdminConnection.class);
115 
116 
117     /***
118      * Construct a new <code>AdminConnection</code>.
119      *
120      * @param config        the configuration
121      * @param authenticator the authentication manager
122      * @param destinations  the destination manager
123      * @param database      the database service
124      * @param services      the services
125      */
126     protected AdminConnection(Configuration config, AuthenticationMgr authenticator,
127                               DestinationManager destinations, ConsumerManager consumers,
128                               DatabaseService database,
129                               Services services) {
130         if (config == null) {
131             throw new IllegalArgumentException("Argument 'config' is null");
132         }
133         if (authenticator == null) {
134             throw new IllegalArgumentException(
135                     "Argument 'authenticator' is null");
136         }
137         if (destinations == null) {
138             throw new IllegalArgumentException(
139                     "Argument 'destinations' is null");
140         }
141         if (consumers == null) {
142             throw new IllegalArgumentException("Argument 'consumers' is null");
143         }
144         if (database == null) {
145             throw new IllegalArgumentException("Argument 'database' is null");
146         }
147         if (services == null) {
148             throw new IllegalArgumentException("Argument 'services' is null");
149         }
150         _config = config;
151         _authenticator = authenticator;
152         _destinations = destinations;
153         _consumers = consumers;
154         _database = database;
155         _services = services;
156     }
157 
158     /***
159      * Close the admin connection
160      */
161     public void close() {
162     }
163 
164     /***
165      * Return the number of messages for a durable consumer.
166      *
167      * @param topic name of the topic
168      * @param name  consumer name
169      * @return int                 number of unsent or unacked messages
170      */
171     public int getDurableConsumerMessageCount(String topic, String name) {
172         int count = -1;
173         try {
174             // first see if the cache is loaded in memory
175             JmsDestination dest = _destinations.getDestination(topic);
176             ConsumerEndpoint endpoint = null;
177             if ((dest != null)
178                     && ((name != null)
179                     || (name.length() > 0))) {
180 
181                 endpoint = _consumers.getConsumerEndpoint(name);
182                 if ((endpoint != null)
183                         && (endpoint.getDestination().equals(dest))) {
184                     // retrieve the number of handles for the endpoint, which
185                     // reflects the number of messages
186                     count = endpoint.getMessageCount();
187                 } else {
188                     // there is no cache with this name stored in memory. If
189                     // this is an administered destination then read the count
190                     //  directly from the database.
191                     if (dest.getPersistent()) {
192                         try {
193                             _database.begin();
194                             Connection connection = _database.getConnection();
195                             count = _database.getAdapter().
196                                     getDurableConsumerMessageCount(connection, topic,
197                                                                    name);
198                             _database.commit();
199                         } catch (PersistenceException exception) {
200                             _log.error(exception, exception);
201                             try {
202                                 _database.rollback();
203                             } catch (PersistenceException error) {
204                                 // no-op
205                             }
206                         }
207                     }
208                 }
209             }
210         } catch (Exception exception) {
211             _log.error("Failed to get message count for topic=" + topic,
212                        exception);
213         } finally {
214         }
215 
216         return count;
217     }
218 
219     /***
220      * First use the destination manager to return the number of persistent and
221      * non-persistent messages in a queue.
222      *
223      * @param queue name of the queue
224      * @return int - the number of messages for that destination or -1 if the
225      *         destination is invalid
226      */
227     public int getQueueMessageCount(String queue) {
228         int count = -1;
229 
230         try {
231             // first see if the cache is loaded in memory
232             JmsDestination dest = _destinations.getDestination(queue);
233             DestinationCache cache = null;
234             if (dest != null) {
235                 _database.begin();
236                 cache = _destinations.getDestinationCache(dest);
237                 // retrieve the number of handles for the cache, which
238                 // reflects the number of messages
239                 count = cache.getMessageCount();
240                 _database.commit();
241             }
242         } catch (Exception exception) {
243             _log.error("Failed to get message count for queue=" + queue,
244                        exception);
245             rollback();
246         }
247         return count;
248     }
249 
250     /***
251      * Add the specified durable consumer to the database.
252      *
253      * @param topic name of the destination
254      * @param name  name of the consumer
255      * @return boolean             true if successful
256      */
257     public boolean addDurableConsumer(String topic, String name) {
258         boolean result = false;
259         try {
260             JmsTopic t = new JmsTopic(topic);
261             t.setPersistent(true);
262             _consumers.subscribe(t, name, null);
263             result = true;
264         } catch (JMSException exception) {
265             _log.error("Failed to add durable consumer=" + name
266                        + " for topic=" + topic, exception);
267         }
268 
269         return result;
270     }
271 
272     /***
273      * Remove the consumer attached to the specified destination and with the
274      * passed in name.
275      *
276      * @param name name of the consumer
277      * @return boolean             true if successful
278      */
279     public boolean removeDurableConsumer(String name) {
280         boolean result = false;
281         try {
282             _consumers.unsubscribe(name, null);
283             result = true;
284         } catch (JMSException exception) {
285             _log.debug("Failed to remove durable consumer=" + name, exception);
286         }
287 
288         return result;
289     }
290 
291     /***
292      * Check if the durable consumer exists.
293      *
294      * @param name name of the durable conusmer
295      * @return boolean             true if it exists and false otherwise
296      */
297     public boolean durableConsumerExists(String name) {
298         return (_consumers.getConsumerEndpoint(name) != null);
299     }
300 
301     /***
302      * Return the collection of durable consumer names for a particular topic
303      * destination.
304      *
305      * @param topic the topic name
306      * @return Vector              collection of strings
307      */
308     public Vector getDurableConsumers(String topic) {
309         Enumeration iter = null;
310         Vector result = new Vector();
311 
312         try {
313             _database.begin();
314             Connection connection = _database.getConnection();
315 
316             iter = _database.getAdapter().getDurableConsumers(connection,
317                                                               topic);
318             // copy the elements into the vector
319             while (iter.hasMoreElements()) {
320                 result.addElement(iter.nextElement());
321             }
322             _database.commit();
323         } catch (Exception exception) {
324             _log.error("Failed on get durable consumers for topic=" + topic,
325                        exception);
326             rollback();
327         }
328 
329         return result;
330     }
331 
332     /***
333      * De-Activate an active persistent consumer.
334      *
335      * @param name name of the consumer
336      * @return boolean             true if successful
337      */
338     public boolean unregisterConsumer(String name) {
339         boolean success = false;
340 
341         ConsumerEndpoint endpoint = _consumers.getConsumerEndpoint(name);
342         if (endpoint != null) {
343             _consumers.closeConsumer(endpoint);
344         }
345         success = true;
346 
347         return success;
348     }
349 
350     /***
351      * Check to see if the given consumer is currently connected to the
352      * OpenJMSServer. This is only valid when in online mode.
353      *
354      * @param name The name of the onsumer.
355      * @return boolean True if the consumer is connected.
356      */
357     public boolean isConnected(String name) {
358         boolean result = false;
359         ConsumerEndpoint endpoint = _consumers.getConsumerEndpoint(name);
360         if (endpoint != null && endpoint instanceof DurableConsumerEndpoint) {
361             result = ((DurableConsumerEndpoint) endpoint).isActive();
362         }
363         return result;
364     }
365 
366     /***
367      * Return a list of all registered destinations.
368      *
369      * @return Vector     collection of strings
370      */
371     public Vector getAllDestinations() {
372         Enumeration iter = null;
373         Vector result = new Vector();
374 
375         try {
376             _database.begin();
377             Connection connection = _database.getConnection();
378 
379             iter = _database.getAdapter().getAllDestinations(connection);
380             // copy the elements into the vector
381             while (iter.hasMoreElements()) {
382                 result.addElement(iter.nextElement());
383             }
384             _database.commit();
385         } catch (Exception exception) {
386             _log.error("Failed to get all destinations", exception);
387             rollback();
388         }
389 
390         return result;
391     }
392 
393     /***
394      * Add an administered destination with the specified name.
395      *
396      * @param name  destination name
397      * @param queue whether it is queue or a topic
398      * @return boolean             true if successful
399      */
400     public boolean addDestination(String name, Boolean queue) {
401 
402         boolean success = false;
403 
404         // create the appropriate destination object
405         JmsDestination destination = (queue.booleanValue())
406                 ? (JmsDestination) new JmsQueue(name)
407                 : (JmsDestination) new JmsTopic(name);
408         destination.setPersistent(true);
409 
410         // create the administered destination
411         try {
412             if (_destinations.getDestination(name) == null) {
413                 _destinations.createDestination(destination);
414                 success = true;
415             }
416         } catch (JMSException exception) {
417             _log.error("Failed to add destination=" + name, exception);
418         }
419 
420         return success;
421     }
422 
423     /***
424      * Destroy the specified destination and all associated messsages and
425      * consumers. This is a very dangerous operation to execute while there are
426      * clients online
427      *
428      * @param name destination to destroy
429      * @return boolean             true if successful
430      */
431     public boolean removeDestination(String name) {
432 
433         boolean success = false;
434         JmsDestination dest = _destinations.getDestination(name);
435 
436         // ensure that the destination actually translates to a valid
437         // object.
438         if (dest != null) {
439             try {
440                 _destinations.removeDestination(dest);
441                 success = true;
442             } catch (JMSException exception) {
443                 _log.error("Failed to remove destination=" + name, exception);
444             }
445         }
446 
447         return success;
448     }
449 
450     /***
451      * Check whether the specified destination exists
452      *
453      * @param name - the name of the destination to check
454      * @return boolean - true if it does and false otherwise
455      */
456     public boolean destinationExists(String name) {
457 
458         JmsDestination dest = _destinations.getDestination(name);
459         return (dest != null);
460     }
461 
462     /***
463      * Terminate the JMS Server. If it is running as a standalone application
464      * then exit the application. It is running as an embedded application then
465      * just terminate the thread
466      */
467     public void stopServer() {
468         boolean isEmbedded = false;
469         Connector[] connectors = _config.getConnectors().getConnector();
470         for (int i = 0; i < connectors.length; ++i) {
471             if (connectors[i].getScheme().equals(SchemeType.EMBEDDED)) {
472                 isEmbedded = true;
473                 break;
474             }
475         }
476 
477         final boolean exit = !isEmbedded;
478 
479         Runnable r = new Runnable() {
480 
481             public void run() {
482                 try {
483                     // give the caller a chance to return before shutting
484                     // down services
485                     Thread.sleep(1000);
486                 } catch (InterruptedException ignore) {
487                 }
488                 _log.info("Stopping all services");
489                 try {
490                     _services.stop();
491                 } catch (ServiceException exception) {
492                     _log.error(exception, exception);
493                 }
494 
495                 if (exit) {
496                     _log.info("Server shutdown scheduled for 5 secs");
497                     try {
498                         Thread.sleep(5000);
499                     } catch (InterruptedException ignore) {
500                     }
501                     System.exit(0);
502                 }
503             }
504         };
505         Thread t = new Thread(r);
506         t.start();
507     }
508 
509     /***
510      * Purge all processed messages from the database
511      *
512      * @return int         number of messages purged
513      */
514     public int purgeMessages() {
515         return _database.getAdapter().purgeMessages();
516     }
517 
518     /***
519      * Add a user with the specified name
520      *
521      * @param username the users name
522      * @param password the users password
523      * @return <code>true</code> if the user is added otherwise
524      *         <code>false</code>
525      */
526     public boolean addUser(String username, String password) {
527         return _authenticator.addUser(new User(username, password));
528     }
529 
530     /***
531      * Change password for the specified user
532      *
533      * @param username the users name
534      * @param password the users password
535      * @return <code>true</code> if the password is changed otherwise
536      *         <code>false</code>
537      */
538     public boolean changePassword(String username, String password) {
539         return _authenticator.updateUser(new User(username, password));
540     }
541 
542     /***
543      * Remove the specified user
544      *
545      * @param username the users name
546      * @return <code>true</code> if the user is removed otherwise
547      *         <code>false</code>
548      */
549     public boolean removeUser(String username) {
550         return _authenticator.removeUser(new User(username, null));
551     }
552 
553     /***
554      * Return a list of all registered users.
555      *
556      * @return Vector of users
557      */
558     public Vector getAllUsers() {
559         Enumeration iter = null;
560         Vector result = new Vector();
561 
562         try {
563             _database.begin();
564             Connection connection = _database.getConnection();
565 
566             iter = _database.getAdapter().getAllUsers(connection);
567             // copy the elements into the vector
568             while (iter.hasMoreElements()) {
569                 result.addElement(iter.nextElement());
570             }
571             _database.commit();
572         } catch (Exception exception) {
573             _log.error("Failed on get all users", exception);
574             rollback();
575         }
576 
577         return result;
578     }
579 
580     /***
581      * Rollback the current transaction, logging any error.
582      */
583     private void rollback() {
584         try {
585             _database.rollback();
586         } catch (PersistenceException exception) {
587             _log.error(exception, exception);
588         }
589     }
590 
591 }