1 /*** 2 * Redistribution and use of this software and associated documentation 3 * ("Software"), with or without modification, are permitted provided 4 * that the following conditions are met: 5 * 6 * 1. Redistributions of source code must retain copyright 7 * statements and notices. Redistributions must also contain a 8 * copy of this document. 9 * 10 * 2. Redistributions in binary form must reproduce the 11 * above copyright notice, this list of conditions and the 12 * following disclaimer in the documentation and/or other 13 * materials provided with the distribution. 14 * 15 * 3. The name "Exolab" must not be used to endorse or promote 16 * products derived from this Software without prior written 17 * permission of Exoffice Technologies. For written permission, 18 * please contact info@exolab.org. 19 * 20 * 4. Products derived from this Software may not be called "Exolab" 21 * nor may "Exolab" appear in their names without prior written 22 * permission of Exoffice Technologies. Exolab is a registered 23 * trademark of Exoffice Technologies. 24 * 25 * 5. Due credit should be given to the Exolab Project 26 * (http://www.exolab.org/). 27 * 28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS 29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT 30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 39 * OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved. 42 */ 43 package org.exolab.jms.messagemgr; 44 45 import javax.jms.InvalidSelectorException; 46 import javax.jms.JMSException; 47 48 import org.exolab.jms.client.JmsDestination; 49 import org.exolab.jms.message.MessageImpl; 50 import org.exolab.jms.selector.Selector; 51 52 53 /*** 54 * Abstract implementation of the {@link ConsumerEndpoint} interface. 55 * 56 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a> 57 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> 58 * @version $Revision: 1.3 $ $Date: 2005/08/30 06:25:47 $ 59 */ 60 public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint { 61 62 /*** 63 * The identity of this consumer. 64 */ 65 private final long _id; 66 67 /*** 68 * The destination the consumer is acceesing. 69 */ 70 private final JmsDestination _destination; 71 72 /*** 73 * The message selector associated with this consumer. May be 74 * <code>null</code>. 75 */ 76 private Selector _selector; 77 78 /*** 79 * If true, and the destination is a topic, inhibits the delivery of 80 * messages published by its own connection. 81 */ 82 private boolean _noLocal; 83 84 /*** 85 * Determines if this consumer is asynchronous. 86 */ 87 private boolean _asynchronous = false; 88 89 /*** 90 * The receive timeout, if the client is performing a blocking receive. A 91 * value of <code>0</code> indicates the client is blocking indefinitely. 92 */ 93 private Condition _waitingForMessage; 94 95 /*** 96 * The listener to notify when a message is available. 97 */ 98 private ConsumerEndpointListener _listener = null; 99 100 /*** 101 * Determines if this is (or is in the process of being) closed. 102 */ 103 private final Flag _closed = new Flag(false); 104 105 106 /*** 107 * Construct a new <code>ConsumerEndpoint</code>. 108 * <p/> 109 * The destination and selector determine where it will be sourcing its 110 * messages from, and scheduler is used to asynchronously deliver messages 111 * to the consumer. 112 * 113 * @param consumerId the identity of this consumer 114 * @param destination the destination to access 115 * @param selector the message selector. May be <code>null</code> 116 * @param noLocal if true, and the destination is a topic, inhibits the 117 * delivery of messages published by its own connection. 118 * @throws InvalidSelectorException if the selector is not well formed 119 */ 120 public AbstractConsumerEndpoint(long consumerId, JmsDestination destination, 121 String selector, boolean noLocal) 122 throws InvalidSelectorException { 123 if (destination == null) { 124 throw new IllegalArgumentException( 125 "Argument 'destination' is null"); 126 } 127 _id = consumerId; 128 _destination = destination; 129 setSelector(selector); 130 _noLocal = noLocal; 131 } 132 133 /*** 134 * Returns the identity of this consumer. 135 * 136 * @return the identity of this consumer 137 */ 138 public long getId() { 139 return _id; 140 } 141 142 /*** 143 * Determines if this is a persistent or non-persistent consumer. 144 * <p/> 145 * If persistent, then the consumer is persistent accross subscriptions and 146 * server restarts, and {@link #getPersistentId} returns a non-null value. 147 * 148 * @return <code>false</code> 149 */ 150 public boolean isPersistent() { 151 return false; 152 } 153 154 /*** 155 * Returns the persistent identifier for this consumer. This is the identity 156 * of the consumer which is persistent across subscriptions and server 157 * restarts. 158 * 159 * @return <code>null</code> 160 */ 161 public String getPersistentId() { 162 return null; 163 } 164 165 /*** 166 * Return the destination that this consumer is accessing. 167 * 168 * @return the destination that this consumer is accessing 169 */ 170 public JmsDestination getDestination() { 171 return _destination; 172 } 173 174 /*** 175 * Determines if this consumer can consume messages from the specified 176 * destination. 177 * 178 * @param destination the destination 179 * @return <code>true</code> if the consumer can consume messages from 180 * <code>destination</code>; otherwise <code>false</code> 181 */ 182 public boolean canConsume(JmsDestination destination) { 183 return _destination.equals(destination); 184 } 185 186 /*** 187 * Returns the message selector. 188 * 189 * @return the message selector, or <code>null</code> if none was specified 190 * by the client 191 */ 192 public Selector getSelector() { 193 return _selector; 194 } 195 196 /*** 197 * Determines if a message is selected by the consumer. 198 * 199 * @param message the message to check 200 * @return <code>true</code> if the message is selected; otherwise 201 * <code>false</code> 202 */ 203 public boolean selects(MessageImpl message) { 204 return (_selector == null || _selector.selects(message)); 205 } 206 207 /*** 208 * Returns if locally produced messages are being inhibited. 209 * 210 * @return <code>true</code> if locally published messages are being 211 * inhibited. 212 */ 213 public boolean getNoLocal() { 214 return _noLocal; 215 } 216 217 /*** 218 * Return the next available message to the client. 219 * 220 * @param cancel 221 * @return the next message, or <code>null</code> if none is available 222 * @throws JMSException for any error 223 */ 224 public final synchronized MessageHandle receive(final Condition cancel) 225 throws JMSException { 226 MessageHandle result = null; 227 if (!_closed.get()) { 228 Condition condition = new Condition() { 229 public boolean get() { 230 return _closed.get() || cancel.get(); 231 } 232 }; 233 result = doReceive(condition); 234 } 235 return result; 236 } 237 238 /*** 239 * Indicates if this is an asynchronous consumer. 240 * <p/> 241 * An asynchronous consumer has a client <code>MessageConsumer</code> with 242 * an associated <code>MessageListener</code>. 243 * 244 * @param asynchronous if <code>true</code> marks this as an asynchronous 245 * consumer 246 */ 247 public synchronized void setAsynchronous(boolean asynchronous) { 248 _asynchronous = asynchronous; 249 } 250 251 /*** 252 * Determines if this is an asynchronous consumer. 253 * 254 * @return <code>true</code> if this is an asynchronous consumer; otherwise 255 * <code>false</code> 256 */ 257 public synchronized boolean isAsynchronous() { 258 return _asynchronous; 259 } 260 261 /*** 262 * Indicates that the client is currently waiting for a message. 263 * 264 * @param condition the condition to evaluate to determine if the client is 265 * waiting for message. May be <code>null</code>. 266 */ 267 public synchronized void setWaitingForMessage(Condition condition) { 268 _waitingForMessage = condition; 269 } 270 271 /*** 272 * Determines if the client is currently waiting for a message. 273 * 274 * @return <code>true</code> if the client is waiting for messages; 275 * otherwise <code>false</code> 276 */ 277 public synchronized boolean isWaitingForMessage() { 278 return _waitingForMessage != null && _waitingForMessage.get(); 279 } 280 281 /*** 282 * Set the listener for this consumer. If a listener is set, it is notified 283 * when messages become available. 284 * 285 * @param listener the listener to add, or <code>null</code> to remove an 286 * existing listener 287 */ 288 public synchronized void setListener(ConsumerEndpointListener listener) { 289 _listener = listener; 290 } 291 292 /*** 293 * Determines if this consumer is closed, or in the process of being 294 * closed. 295 * 296 * @return <code>true</code> if this consumer is closed; otherwise 297 * <code>false</code> 298 */ 299 public final boolean isClosed() { 300 return _closed.get(); 301 } 302 303 /*** 304 * Close this endpoint. 305 */ 306 public final void close() { 307 _closed.set(true); 308 synchronized (this) { 309 _listener = null; 310 doClose(); 311 } 312 } 313 314 /*** 315 * Returns a stringified version of the consumer. 316 * 317 * @return a stringified version of the consumer 318 */ 319 public String toString() { 320 return _id + ":" + getDestination(); 321 } 322 323 /*** 324 * Return the next available message to the client. 325 * <p/> 326 * This method will not be invoked if the consumer is being closed, however 327 * it is possible for {@link #close()} to be invoked while this method is in 328 * progress. Implementations should therefore invoke isClosed() to determine 329 * if the consumer is in the process of being closed, and if so, return 330 * <code>null</code>. 331 * 332 * @param cancel 333 * @return the next message, or <code>null</code> if none is available 334 * @throws JMSException for any error 335 */ 336 protected abstract MessageHandle doReceive(Condition cancel) 337 throws JMSException; 338 339 /*** 340 * Closes the endpoint. 341 */ 342 protected abstract void doClose(); 343 344 /*** 345 * Notify the listener that a message is available for this consumer. 346 */ 347 protected synchronized void notifyMessageAvailable() { 348 if (_listener != null && !_closed.get()) { 349 _listener.messageAvailable(this); 350 } 351 } 352 353 /*** 354 * Sets the message selector. 355 * 356 * @param selector the message selector. May be <code>null</code> 357 * @throws InvalidSelectorException if the selector is not well formed 358 */ 359 protected void setSelector(String selector) 360 throws InvalidSelectorException { 361 _selector = (selector != null) ? new Selector(selector) : null; 362 } 363 364 /*** 365 * Determines if locally produced messages are being inhibited. 366 * 367 * @param noLocal if <code>true</code>, locally published messages are 368 * inhibited. 369 */ 370 protected void setNoLocal(boolean noLocal) { 371 _noLocal = noLocal; 372 } 373 374 }