MQConnectionManager
Class to manage connections for the Message Queue resources. Also, set of ‘private’ helper functions to access and modify the message queue connection storage. They are ment to be used only internally by the MQConnectionManager, which should assure thread-safe access to it and standard S_OK/S_ERROR error handling. MQConnection storage is a dict structure that contains the MQ connections used and reused for producer/consumer communication. Example structure:
{
mardirac3.in2p3.fr: {'MQConnector':StompConnector, 'destinations':{'/queue/test1':['consumer1', 'producer1'],
'/queue/test2':['consumer1', 'producer1']}},
blabal.cern.ch: {'MQConnector':None, 'destinations':{'/queue/test2':['consumer2', 'producer2',]}}
}
- class DIRAC.Resources.MessageQueue.MQConnectionManager.MQConnectionManager(connectionStorage=None)
Bases:
object
Manages connections for the Message Queue resources in form of the interal connection storage.
- __init__(connectionStorage=None)
- addNewMessenger(mqURI, messengerType)
- Function updates the MQ connection by adding the messenger Id to the internal connection storage.
Also the messengerId is chosen. messenger Id is set to the maximum existing value (or 0 no messengers are connected) + 1. messenger Id is calculated separately for consumers and producers
- createConnectorAndConnect(parameters)
- disconnect(connector)
- getAllMessengers()
Function returns a list of all messengers registered in connection storage.
- Returns:
- with the list of strings in the pseudo-path format e.g.
[‘blabla.cern.ch/queue/test1/consumer1’,’blabal.cern.ch/topic/test2/producer2’]
- Return type:
S_OK or S_ERROR
- getConnector(mqConnection)
Function returns MQConnector assigned to the mqURI.
- Parameters:
mqConnection (str) – connection name.
- Returns:
with the value of the MQConnector in S_OK if not None
- Return type:
S_OK/S_ERROR
- property lock
Lock to assure thread-safe access to the internal connection storage.
- removeAllConnections()
- Function removes all existing connections and calls the disconnect
for connectors.
- Return type:
S_OK or S_ERROR
- startConnection(mqURI, params, messengerType)
- Function adds or updates the MQ connection. If the connection
does not exists, MQconnector is created and added.
- stopConnection(mqURI, messengerId)
- Function ‘stops’ the connection for given messenger, which means
it removes it from the messenger list. If this is the consumer, the unsubscribe() connector method is called. If it is the last messenger of this destination (queue or topic), then the destination is removed. If it is the last destination from this connection. The disconnect function is called and the connection is removed.
- unsubscribe(connector, destination, messengerId)