org.exolab.jms.messagemgr
Class ConsumerEndpoint

java.lang.Object
  |
  +--org.exolab.jms.messagemgr.ConsumerEndpoint
All Implemented Interfaces:
DestinationCacheEventListener, Identifiable, java.lang.Runnable, java.io.Serializable
Direct Known Subclasses:
QueueBrowserEndpoint, QueueConsumerEndpoint, TopicConsumerEndpoint

public abstract class ConsumerEndpoint
extends java.lang.Object
implements java.io.Serializable, Identifiable, DestinationCacheEventListener, java.lang.Runnable

A Consumer is a message subscriber with a unique identity

Version:
$Revision: 1.36 $ $Date: 2003/09/25 11:23:13 $
Author:
Jim Alateras, Tim Anderson
See Also:
Serialized Form

Field Summary
protected  int _ackMode
          The acknowledgement mode for this endpoint.
protected  int _connectionId
          Holds the connection id of the connection that the endpoint belongs too
protected  InternalMessageListener _listener
          Holds the consumer's message listener.
protected  boolean _nolocal
          The nolocal indicator, if set, inhibits consuming messages that have been published on the same connection
protected  Scheduler _scheduler
          This is the scheduler that is used to deliver messages if a consumer has a registered listener
protected  Selector _selector
          The selector assoicated with this consumer.
protected  JmsServerSession _session
          caches the session that created this endpoint
protected  int _size
          Maintains the maximum size of this cache
protected  boolean _transacted
          Indicates whether this endpoint belongs to a transacted session
protected  boolean _waitingForMessage
          Used to block consumer until there is a message available
protected  java.lang.Object _waitingForMessageMonitor
          Serializes access to the _waitingForMessage flag.
 
Method Summary
protected  void addMessage(MessageHandle handle)
          Add the handle to the cache
protected  void addMessage(MessageHandle handle, MessageImpl message)
          Cache a handle and its corresponding message
protected  void clearMessages()
          Clear all messages in the cache, regardless of whether they are persistent or non-persistent
protected  void clearWaitingForMessage()
          Clear the waiting for message flag
 void close()
          Close this endpoint.
 void collectGarbage(boolean aggressive)
           
protected  boolean collectionHasPersistentHandles(java.util.Vector collection)
          Check whether the vector of handles contains one or more persistent handles
protected  boolean containsMessage(MessageHandle handle)
          Determines if a message handle is already cached
protected  void deleteMessage(MessageHandle handle)
          Delete the message with the specified handle from the cache
abstract  boolean deliverMessages()
          Deliver messages in the cache to the consumer
protected abstract  void doClose()
          Closes the endpoint
 int getAckMode()
          Return the ackmode for this endpoint
 long getClientId()
          Return a reference to the client identity.
 int getConnectionId()
          Return the connection id that this endpoint belongs to
abstract  JmsDestination getDestination()
          Return the destination that this consumer is subscribed to
 java.lang.String getId()
          Return the identity of the object
 int getMaximumSize()
          Return the cache's maximum size
protected  MessageImpl getMessage(MessageHandle handle)
          Return the message for the specified handle
 int getMessageCount()
          Return the number of unsent messages in the cache for this consumer
 boolean getNoLocal()
          Return the value of the nolocal indicator
 java.lang.String getPersistentId()
          Returns the persistent identifier for this consumer.

This is the identity of the consumer which is persistent across subscriptions and server restarts.

 Selector getSelector()
          Return the selector for this endpoint or null if one is not specified
 JmsServerSession getSession()
          Return a reference to the session owning this endpoint
 boolean getTransacted()
          Check whether this endpoint belongs to a transacted session
 int hashCode()
           
protected  boolean isStopped()
          Determines if this endpoint has been stopped
protected  boolean isWaitingForMessage()
          Check whether the endpoint is waiting for a message
 boolean messageAdded(MessageImpl message)
          This event is called when a non-persistent message is added to the DestinationCache.
 boolean messageRemoved(MessageImpl message)
          This event is called when a message is removed from the DestinationCache.
protected  void notifyMessageAvailable()
          Check if the consumer is waiting for a message.
 boolean persistentMessageAdded(java.sql.Connection connection, MessageImpl message)
          This event is called when a persistent message is added to the DestinationCache.
 boolean persistentMessageRemoved(java.sql.Connection connection, MessageImpl message)
          This event is called when a message is removed from the DestinationCache.
abstract  MessageHandle receiveMessage(long wait)
          Return the next message to the client.
 void recover()
          This message will return all unacked messages to the queue and allow them to be resent to the consumer with the redelivery flag on.
protected  MessageHandle removeFirstMessage()
          Return the first message handle in the cache
protected  boolean removeMessage(MessageHandle handle)
          Remove the handle from the cache
 void returnMessage(MessageHandle handle)
          Return the specified message to the cache.
 void run()
          The run method is used to asynchronously deliver the messages in the cache to the consumer, by invoking deliverMessages().
protected  void schedule()
          Schedule asynchronouse message delivery
 void setAckMode(int ackmode)
          Set the ackMode for this endpoint
 void setCacheEvictionPolicy(CacheEvictionPolicy policy)
          Set the CacheEvictionPolicy for this object.
 void setConnectionId(int id)
          Set the connection that this endpooint belongs too
 void setMaximumSize(int size)
          Set the maximum size of the cache.
 void setMessageListener(InternalMessageListener listener)
          Set the message listener for this consmer.
 void setNoLocal(boolean nolocal)
          Set the no local indicator for this endpoint
 void setSelector(java.lang.String selector)
          Set the selector for this endpoint
 void setStopped(boolean stop)
          Stop/start message delivery
 void setTransacted(boolean transacted)
          Set the state if the transacted flag for this endpoint
protected  void setWaitingForMessage()
          Set the waiting for message flag
protected  boolean stopDelivery()
          Helper for deliverMessages() implementations, to determines if asynchronous message delivery should be stopped
 java.lang.String toString()
          Return a stringified version of the consumer
abstract  void unregister()
          Unregister this consumer for the specified destination cache, so that it will stop receiving messages from it.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, notify, notifyAll, wait, wait, wait
 

Field Detail

_selector

protected Selector _selector
The selector assoicated with this consumer. A selector is used to filter messages.

_waitingForMessageMonitor

protected final java.lang.Object _waitingForMessageMonitor
Serializes access to the _waitingForMessage flag. This is only required when it is changed

_waitingForMessage

protected boolean _waitingForMessage
Used to block consumer until there is a message available

_listener

protected InternalMessageListener _listener
Holds the consumer's message listener. This means that messages will be pushed down

_size

protected int _size
Maintains the maximum size of this cache

_scheduler

protected transient Scheduler _scheduler
This is the scheduler that is used to deliver messages if a consumer has a registered listener

_ackMode

protected transient int _ackMode
The acknowledgement mode for this endpoint.

_nolocal

protected transient boolean _nolocal
The nolocal indicator, if set, inhibits consuming messages that have been published on the same connection

_transacted

protected transient boolean _transacted
Indicates whether this endpoint belongs to a transacted session

_connectionId

protected transient int _connectionId
Holds the connection id of the connection that the endpoint belongs too

_session

protected JmsServerSession _session
caches the session that created this endpoint
Method Detail

getDestination

public abstract JmsDestination getDestination()
Return the destination that this consumer is subscribed to
Returns:
the destination that this consumer is subscribed to

getId

public java.lang.String getId()
Description copied from interface: Identifiable
Return the identity of the object
Specified by:
getId in interface Identifiable
Following copied from interface: org.exolab.jms.Identifiable
Returns:
the object identity

getPersistentId

public java.lang.String getPersistentId()
Returns the persistent identifier for this consumer.

This is the identity of the consumer which is persistent across subscriptions and server restarts.

This implementation simply returns the transient identifier, i.e, getId()

Returns:
the persistent identifier for this consumer

hashCode

public int hashCode()
Overrides:
hashCode in class java.lang.Object

toString

public java.lang.String toString()
Return a stringified version of the consumer
Overrides:
toString in class java.lang.Object
Returns:
String

unregister

public abstract void unregister()
Unregister this consumer for the specified destination cache, so that it will stop receiving messages from it.

getClientId

public long getClientId()
Return a reference to the client identity. This is an indirect reference back to the remote client, which can asynchronously receive messages
Returns:
identity of the client scoped to the session

getSelector

public Selector getSelector()
Return the selector for this endpoint or null if one is not specified
Returns:
String - the endpoint's selector

setSelector

public void setSelector(java.lang.String selector)
                 throws javax.jms.InvalidSelectorException
Set the selector for this endpoint
Parameters:
selector - - message selector as a string
Throws:
javax.jms.InvalidSelectorException - - raised when selector is not well-formed

getAckMode

public int getAckMode()
Return the ackmode for this endpoint
Returns:
int - acknowledgement mode

setAckMode

public void setAckMode(int ackmode)
Set the ackMode for this endpoint
Parameters:
ackmode - - the new ack mode for the endpoint

getConnectionId

public int getConnectionId()
Return the connection id that this endpoint belongs to
Returns:
the connection id

setConnectionId

public void setConnectionId(int id)
Set the connection that this endpooint belongs too
Parameters:
id - - connection identity

getNoLocal

public boolean getNoLocal()
Return the value of the nolocal indicator
Returns:
boolean

setNoLocal

public void setNoLocal(boolean nolocal)
Set the no local indicator for this endpoint
Parameters:
nolocal - - true to inhibit messages published on this connection

getTransacted

public boolean getTransacted()
Check whether this endpoint belongs to a transacted session
Returns:
boolean - true if it does

setTransacted

public void setTransacted(boolean transacted)
Set the state if the transacted flag for this endpoint
Parameters:
transacted - - true if it is transacted

setMaximumSize

public void setMaximumSize(int size)
Set the maximum size of the cache. If there are more than this number of messages in the cache the CacheEvictionPolicy is enforced to remove messages.
Parameters:
size - - maximum number of messages a destination can hold

getMaximumSize

public int getMaximumSize()
Return the cache's maximum size
Returns:
int - size of cache

setCacheEvictionPolicy

public void setCacheEvictionPolicy(CacheEvictionPolicy policy)
Set the CacheEvictionPolicy for this object. This determines how messages are removed when the cache's upper limit is reached.
Parameters:
policy - the eviction policy

getMessageCount

public int getMessageCount()
Return the number of unsent messages in the cache for this consumer
Returns:
the number of unsent messages

getSession

public JmsServerSession getSession()
Return a reference to the session owning this endpoint
Returns:
JmsServerSession - the owning session

deliverMessages

public abstract boolean deliverMessages()
Deliver messages in the cache to the consumer
Returns:
true if the endpoint should be rescheduled

run

public void run()
The run method is used to asynchronously deliver the messages in the cache to the consumer, by invoking deliverMessages().

It is scheduled by the Scheduler.

Specified by:
run in interface java.lang.Runnable

messageAdded

public boolean messageAdded(MessageImpl message)
Description copied from interface: DestinationCacheEventListener
This event is called when a non-persistent message is added to the DestinationCache.
Specified by:
messageAdded in interface DestinationCacheEventListener
Following copied from interface: org.exolab.jms.messagemgr.DestinationCacheEventListener
Parameters:
message - - message added to cache

messageRemoved

public boolean messageRemoved(MessageImpl message)
Description copied from interface: DestinationCacheEventListener
This event is called when a message is removed from the DestinationCache.
Specified by:
messageRemoved in interface DestinationCacheEventListener
Following copied from interface: org.exolab.jms.messagemgr.DestinationCacheEventListener
Parameters:
message - - message removed from cache

persistentMessageAdded

public boolean persistentMessageAdded(java.sql.Connection connection,
                                      MessageImpl message)
                               throws PersistenceException
Description copied from interface: DestinationCacheEventListener
This event is called when a persistent message is added to the DestinationCache.
Specified by:
persistentMessageAdded in interface DestinationCacheEventListener
Following copied from interface: org.exolab.jms.messagemgr.DestinationCacheEventListener
Parameters:
connection - - the database connection
message - - message added to cache
Throws:
PersistenceException - - if there is a persistence related problem

persistentMessageRemoved

public boolean persistentMessageRemoved(java.sql.Connection connection,
                                        MessageImpl message)
                                 throws PersistenceException
Description copied from interface: DestinationCacheEventListener
This event is called when a message is removed from the DestinationCache.
Specified by:
persistentMessageRemoved in interface DestinationCacheEventListener
Following copied from interface: org.exolab.jms.messagemgr.DestinationCacheEventListener
Parameters:
connection - - the database connection
message - - message to remove from cache
Throws:
PersistenceException - - if there is a persistence related problem

setStopped

public void setStopped(boolean stop)
Stop/start message delivery
Parameters:
stop - if true to stop message delivery, otherwise start it

recover

public void recover()
This message will return all unacked messages to the queue and allow them to be resent to the consumer with the redelivery flag on.

close

public final void close()
Close this endpoint.

This synchronizes with deliverMessages() before invoking


setMessageListener

public void setMessageListener(InternalMessageListener listener)
Set the message listener for this consmer. If a message listener is set then messages will be scheduled to be sent to it when they are available

Each consumer cache can only have a single message listener. To remove the message listener call this method with null argument

Parameters:
listener - - the message listener to add.

returnMessage

public void returnMessage(MessageHandle handle)
Return the specified message to the cache.
Parameters:
handle - - handle to return

receiveMessage

public abstract MessageHandle receiveMessage(long wait)
Return the next message to the client. This will also mark the message as sent and move it to the sent queue
Parameters:
wait - - the number of milliseconds to wait
Returns:
MessageHandle - handle to the next message in the list

collectGarbage

public void collectGarbage(boolean aggressive)

doClose

protected abstract void doClose()
Closes the endpoint

schedule

protected void schedule()
Schedule asynchronouse message delivery

clearMessages

protected void clearMessages()
Clear all messages in the cache, regardless of whether they are persistent or non-persistent

collectionHasPersistentHandles

protected boolean collectionHasPersistentHandles(java.util.Vector collection)
Check whether the vector of handles contains one or more persistent handles
Parameters:
handles - - collection of MessageHandle objects
Returns:
true if there is one or more persistent handles

addMessage

protected void addMessage(MessageHandle handle)
Add the handle to the cache
Parameters:
handle - - the message handle to add

addMessage

protected void addMessage(MessageHandle handle,
                          MessageImpl message)
Cache a handle and its corresponding message
Parameters:
handle - the handle to cache
message - the corresponding message to cache

getMessage

protected MessageImpl getMessage(MessageHandle handle)
Return the message for the specified handle
Parameters:
handle - - the handle
Returns:
MessageImpl - the associated message

removeMessage

protected boolean removeMessage(MessageHandle handle)
Remove the handle from the cache
Parameters:
handle - the handle to remove
Returns:
true if the message was removed

containsMessage

protected boolean containsMessage(MessageHandle handle)
Determines if a message handle is already cached
Returns:
true if it is cached

removeFirstMessage

protected MessageHandle removeFirstMessage()
Return the first message handle in the cache
Returns:
the first message or null if cache is empty

deleteMessage

protected void deleteMessage(MessageHandle handle)
Delete the message with the specified handle from the cache
Parameters:
handle - the handle

isStopped

protected final boolean isStopped()
Determines if this endpoint has been stopped
Returns:
true if this endpoint has been stopped

notifyMessageAvailable

protected void notifyMessageAvailable()
Check if the consumer is waiting for a message. If it is then notify it that a message has arrived

isWaitingForMessage

protected final boolean isWaitingForMessage()
Check whether the endpoint is waiting for a message
Returns:
boolean

setWaitingForMessage

protected final void setWaitingForMessage()
Set the waiting for message flag

clearWaitingForMessage

protected final void clearWaitingForMessage()
Clear the waiting for message flag

stopDelivery

protected boolean stopDelivery()
Helper for deliverMessages() implementations, to determines if asynchronous message delivery should be stopped
Returns:
true if asynchronous message delivery should be stopped


Copyright © 1999-2004 The OpenJMS Group. All Rights Reserved.