1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.server;
44
45 import java.sql.Connection;
46 import java.util.Enumeration;
47 import java.util.Vector;
48 import javax.jms.JMSException;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52
53 import org.exolab.jms.authentication.AuthenticationMgr;
54 import org.exolab.jms.authentication.User;
55 import org.exolab.jms.client.JmsDestination;
56 import org.exolab.jms.client.JmsQueue;
57 import org.exolab.jms.client.JmsTopic;
58 import org.exolab.jms.config.Configuration;
59 import org.exolab.jms.config.Connector;
60 import org.exolab.jms.config.types.SchemeType;
61 import org.exolab.jms.messagemgr.ConsumerEndpoint;
62 import org.exolab.jms.messagemgr.ConsumerManager;
63 import org.exolab.jms.messagemgr.DestinationCache;
64 import org.exolab.jms.messagemgr.DestinationManager;
65 import org.exolab.jms.messagemgr.DurableConsumerEndpoint;
66 import org.exolab.jms.persistence.DatabaseService;
67 import org.exolab.jms.persistence.PersistenceException;
68 import org.exolab.jms.service.ServiceException;
69 import org.exolab.jms.service.Services;
70
71
72 /***
73 * A connection is created for every adminclient connecting to the JmsServer.
74 *
75 * @author <a href="mailto:knut@lerpold.no">Knut Lerpold</a>
76 * @version $Revision: 1.7 $ $Date: 2005/12/23 12:17:45 $
77 * @see org.exolab.jms.server.AdminConnectionManager
78 */
79 public class AdminConnection {
80
81 /***
82 * The configuration.
83 */
84 private final Configuration _config;
85
86 /***
87 * The authentication manager.
88 */
89 private final AuthenticationMgr _authenticator;
90
91 /***
92 * The destination manager.
93 */
94 private final DestinationManager _destinations;
95
96 /***
97 * The consumer manager.
98 */
99 private final ConsumerManager _consumers;
100
101 /***
102 * The database service.
103 */
104 private final DatabaseService _database;
105
106 /***
107 * The services.
108 */
109 private final Services _services;
110
111 /***
112 * The logger
113 */
114 private static final Log _log = LogFactory.getLog(AdminConnection.class);
115
116
117 /***
118 * Construct a new <code>AdminConnection</code>.
119 *
120 * @param config the configuration
121 * @param authenticator the authentication manager
122 * @param destinations the destination manager
123 * @param database the database service
124 * @param services the services
125 */
126 protected AdminConnection(Configuration config, AuthenticationMgr authenticator,
127 DestinationManager destinations, ConsumerManager consumers,
128 DatabaseService database,
129 Services services) {
130 if (config == null) {
131 throw new IllegalArgumentException("Argument 'config' is null");
132 }
133 if (authenticator == null) {
134 throw new IllegalArgumentException(
135 "Argument 'authenticator' is null");
136 }
137 if (destinations == null) {
138 throw new IllegalArgumentException(
139 "Argument 'destinations' is null");
140 }
141 if (consumers == null) {
142 throw new IllegalArgumentException("Argument 'consumers' is null");
143 }
144 if (database == null) {
145 throw new IllegalArgumentException("Argument 'database' is null");
146 }
147 if (services == null) {
148 throw new IllegalArgumentException("Argument 'services' is null");
149 }
150 _config = config;
151 _authenticator = authenticator;
152 _destinations = destinations;
153 _consumers = consumers;
154 _database = database;
155 _services = services;
156 }
157
158 /***
159 * Close the admin connection
160 */
161 public void close() {
162 }
163
164 /***
165 * Return the number of messages for a durable consumer.
166 *
167 * @param topic name of the topic
168 * @param name consumer name
169 * @return int number of unsent or unacked messages
170 */
171 public int getDurableConsumerMessageCount(String topic, String name) {
172 int count = -1;
173 try {
174
175 JmsDestination dest = _destinations.getDestination(topic);
176 ConsumerEndpoint endpoint = null;
177 if ((dest != null)
178 && ((name != null)
179 || (name.length() > 0))) {
180
181 endpoint = _consumers.getConsumerEndpoint(name);
182 if ((endpoint != null)
183 && (endpoint.getDestination().equals(dest))) {
184
185
186 count = endpoint.getMessageCount();
187 } else {
188
189
190
191 if (dest.getPersistent()) {
192 try {
193 _database.begin();
194 Connection connection = _database.getConnection();
195 count = _database.getAdapter().
196 getDurableConsumerMessageCount(connection, topic,
197 name);
198 _database.commit();
199 } catch (PersistenceException exception) {
200 _log.error(exception, exception);
201 try {
202 _database.rollback();
203 } catch (PersistenceException error) {
204
205 }
206 }
207 }
208 }
209 }
210 } catch (Exception exception) {
211 _log.error("Failed to get message count for topic=" + topic,
212 exception);
213 } finally {
214 }
215
216 return count;
217 }
218
219 /***
220 * First use the destination manager to return the number of persistent and
221 * non-persistent messages in a queue.
222 *
223 * @param queue name of the queue
224 * @return int - the number of messages for that destination or -1 if the
225 * destination is invalid
226 */
227 public int getQueueMessageCount(String queue) {
228 int count = -1;
229
230 try {
231
232 JmsDestination dest = _destinations.getDestination(queue);
233 DestinationCache cache = null;
234 if (dest != null) {
235 _database.begin();
236 cache = _destinations.getDestinationCache(dest);
237
238
239 count = cache.getMessageCount();
240 _database.commit();
241 }
242 } catch (Exception exception) {
243 _log.error("Failed to get message count for queue=" + queue,
244 exception);
245 rollback();
246 }
247 return count;
248 }
249
250 /***
251 * Add the specified durable consumer to the database.
252 *
253 * @param topic name of the destination
254 * @param name name of the consumer
255 * @return boolean true if successful
256 */
257 public boolean addDurableConsumer(String topic, String name) {
258 boolean result = false;
259 try {
260 JmsTopic t = new JmsTopic(topic);
261 t.setPersistent(true);
262 _consumers.subscribe(t, name, null);
263 result = true;
264 } catch (JMSException exception) {
265 _log.error("Failed to add durable consumer=" + name
266 + " for topic=" + topic, exception);
267 }
268
269 return result;
270 }
271
272 /***
273 * Remove the consumer attached to the specified destination and with the
274 * passed in name.
275 *
276 * @param name name of the consumer
277 * @return boolean true if successful
278 */
279 public boolean removeDurableConsumer(String name) {
280 boolean result = false;
281 try {
282 _consumers.unsubscribe(name, null);
283 result = true;
284 } catch (JMSException exception) {
285 _log.debug("Failed to remove durable consumer=" + name, exception);
286 }
287
288 return result;
289 }
290
291 /***
292 * Check if the durable consumer exists.
293 *
294 * @param name name of the durable conusmer
295 * @return boolean true if it exists and false otherwise
296 */
297 public boolean durableConsumerExists(String name) {
298 return (_consumers.getConsumerEndpoint(name) != null);
299 }
300
301 /***
302 * Return the collection of durable consumer names for a particular topic
303 * destination.
304 *
305 * @param topic the topic name
306 * @return Vector collection of strings
307 */
308 public Vector getDurableConsumers(String topic) {
309 Enumeration iter = null;
310 Vector result = new Vector();
311
312 try {
313 _database.begin();
314 Connection connection = _database.getConnection();
315
316 iter = _database.getAdapter().getDurableConsumers(connection,
317 topic);
318
319 while (iter.hasMoreElements()) {
320 result.addElement(iter.nextElement());
321 }
322 _database.commit();
323 } catch (Exception exception) {
324 _log.error("Failed on get durable consumers for topic=" + topic,
325 exception);
326 rollback();
327 }
328
329 return result;
330 }
331
332 /***
333 * De-Activate an active persistent consumer.
334 *
335 * @param name name of the consumer
336 * @return boolean true if successful
337 */
338 public boolean unregisterConsumer(String name) {
339 boolean success = false;
340
341 ConsumerEndpoint endpoint = _consumers.getConsumerEndpoint(name);
342 if (endpoint != null) {
343 _consumers.closeConsumer(endpoint);
344 }
345 success = true;
346
347 return success;
348 }
349
350 /***
351 * Check to see if the given consumer is currently connected to the
352 * OpenJMSServer. This is only valid when in online mode.
353 *
354 * @param name The name of the onsumer.
355 * @return boolean True if the consumer is connected.
356 */
357 public boolean isConnected(String name) {
358 boolean result = false;
359 ConsumerEndpoint endpoint = _consumers.getConsumerEndpoint(name);
360 if (endpoint != null && endpoint instanceof DurableConsumerEndpoint) {
361 result = ((DurableConsumerEndpoint) endpoint).isActive();
362 }
363 return result;
364 }
365
366 /***
367 * Return a list of all registered destinations.
368 *
369 * @return Vector collection of strings
370 */
371 public Vector getAllDestinations() {
372 Enumeration iter = null;
373 Vector result = new Vector();
374
375 try {
376 _database.begin();
377 Connection connection = _database.getConnection();
378
379 iter = _database.getAdapter().getAllDestinations(connection);
380
381 while (iter.hasMoreElements()) {
382 result.addElement(iter.nextElement());
383 }
384 _database.commit();
385 } catch (Exception exception) {
386 _log.error("Failed to get all destinations", exception);
387 rollback();
388 }
389
390 return result;
391 }
392
393 /***
394 * Add an administered destination with the specified name.
395 *
396 * @param name destination name
397 * @param queue whether it is queue or a topic
398 * @return boolean true if successful
399 */
400 public boolean addDestination(String name, Boolean queue) {
401
402 boolean success = false;
403
404
405 JmsDestination destination = (queue.booleanValue())
406 ? (JmsDestination) new JmsQueue(name)
407 : (JmsDestination) new JmsTopic(name);
408 destination.setPersistent(true);
409
410
411 try {
412 if (_destinations.getDestination(name) == null) {
413 _destinations.createDestination(destination);
414 success = true;
415 }
416 } catch (JMSException exception) {
417 _log.error("Failed to add destination=" + name, exception);
418 }
419
420 return success;
421 }
422
423 /***
424 * Destroy the specified destination and all associated messsages and
425 * consumers. This is a very dangerous operation to execute while there are
426 * clients online
427 *
428 * @param name destination to destroy
429 * @return boolean true if successful
430 */
431 public boolean removeDestination(String name) {
432
433 boolean success = false;
434 JmsDestination dest = _destinations.getDestination(name);
435
436
437
438 if (dest != null) {
439 try {
440 _destinations.removeDestination(dest);
441 success = true;
442 } catch (JMSException exception) {
443 _log.error("Failed to remove destination=" + name, exception);
444 }
445 }
446
447 return success;
448 }
449
450 /***
451 * Check whether the specified destination exists
452 *
453 * @param name - the name of the destination to check
454 * @return boolean - true if it does and false otherwise
455 */
456 public boolean destinationExists(String name) {
457
458 JmsDestination dest = _destinations.getDestination(name);
459 return (dest != null);
460 }
461
462 /***
463 * Terminate the JMS Server. If it is running as a standalone application
464 * then exit the application. It is running as an embedded application then
465 * just terminate the thread
466 */
467 public void stopServer() {
468 boolean isEmbedded = false;
469 Connector[] connectors = _config.getConnectors().getConnector();
470 for (int i = 0; i < connectors.length; ++i) {
471 if (connectors[i].getScheme().equals(SchemeType.EMBEDDED)) {
472 isEmbedded = true;
473 break;
474 }
475 }
476
477 final boolean exit = !isEmbedded;
478
479 Runnable r = new Runnable() {
480
481 public void run() {
482 try {
483
484
485 Thread.sleep(1000);
486 } catch (InterruptedException ignore) {
487 }
488 _log.info("Stopping all services");
489 try {
490 _services.stop();
491 } catch (ServiceException exception) {
492 _log.error(exception, exception);
493 }
494
495 if (exit) {
496 _log.info("Server shutdown scheduled for 5 secs");
497 try {
498 Thread.sleep(5000);
499 } catch (InterruptedException ignore) {
500 }
501 System.exit(0);
502 }
503 }
504 };
505 Thread t = new Thread(r);
506 t.start();
507 }
508
509 /***
510 * Purge all processed messages from the database
511 *
512 * @return int number of messages purged
513 */
514 public int purgeMessages() {
515 return _database.getAdapter().purgeMessages();
516 }
517
518 /***
519 * Add a user with the specified name
520 *
521 * @param username the users name
522 * @param password the users password
523 * @return <code>true</code> if the user is added otherwise
524 * <code>false</code>
525 */
526 public boolean addUser(String username, String password) {
527 return _authenticator.addUser(new User(username, password));
528 }
529
530 /***
531 * Change password for the specified user
532 *
533 * @param username the users name
534 * @param password the users password
535 * @return <code>true</code> if the password is changed otherwise
536 * <code>false</code>
537 */
538 public boolean changePassword(String username, String password) {
539 return _authenticator.updateUser(new User(username, password));
540 }
541
542 /***
543 * Remove the specified user
544 *
545 * @param username the users name
546 * @return <code>true</code> if the user is removed otherwise
547 * <code>false</code>
548 */
549 public boolean removeUser(String username) {
550 return _authenticator.removeUser(new User(username, null));
551 }
552
553 /***
554 * Return a list of all registered users.
555 *
556 * @return Vector of users
557 */
558 public Vector getAllUsers() {
559 Enumeration iter = null;
560 Vector result = new Vector();
561
562 try {
563 _database.begin();
564 Connection connection = _database.getConnection();
565
566 iter = _database.getAdapter().getAllUsers(connection);
567
568 while (iter.hasMoreElements()) {
569 result.addElement(iter.nextElement());
570 }
571 _database.commit();
572 } catch (Exception exception) {
573 _log.error("Failed on get all users", exception);
574 rollback();
575 }
576
577 return result;
578 }
579
580 /***
581 * Rollback the current transaction, logging any error.
582 */
583 private void rollback() {
584 try {
585 _database.rollback();
586 } catch (PersistenceException exception) {
587 _log.error(exception, exception);
588 }
589 }
590
591 }