MQConnectionManager

Class to manage connections for the Message Queue resources. Also, set of ‘private’ helper functions to access and modify the message queue connection storage. They are ment to be used only internally by the MQConnectionManager, which should assure thread-safe access to it and standard S_OK/S_ERROR error handling. MQConnection storage is a dict structure that contains the MQ connections used and reused for producer/consumer communication. Example structure:

{
  mardirac3.in2p3.fr: {'MQConnector':StompConnector, 'destinations':{'/queue/test1':['consumer1', 'producer1'],
                                                                     '/queue/test2':['consumer1', 'producer1']}},
  blabal.cern.ch:     {'MQConnector':None,           'destinations':{'/queue/test2':['consumer2', 'producer2',]}}
}
class DIRAC.Resources.MessageQueue.MQConnectionManager.MQConnectionManager(connectionStorage=None)

Bases: object

Manages connections for the Message Queue resources in form of the interal connection storage.

__init__(connectionStorage=None)
addNewMessenger(mqURI, messengerType)
Function updates the MQ connection by adding the messenger Id to the internal connection storage.

Also the messengerId is chosen. messenger Id is set to the maximum existing value (or 0 no messengers are connected) + 1. messenger Id is calculated separately for consumers and producers

Parameters:
  • mqURI (str)

  • messengerType (str) – ‘consumer’ or ‘producer’.

Returns:

with the value of the messenger Id or S_ERROR if the messenger was not added,

cause the same id already exists.

Return type:

S_OK

createConnectorAndConnect(parameters)
disconnect(connector)
getAllMessengers()

Function returns a list of all messengers registered in connection storage.

Returns:

with the list of strings in the pseudo-path format e.g.

[‘blabla.cern.ch/queue/test1/consumer1’,’blabal.cern.ch/topic/test2/producer2’]

Return type:

S_OK or S_ERROR

getConnector(mqConnection)

Function returns MQConnector assigned to the mqURI.

Parameters:

mqConnection (str) – connection name.

Returns:

with the value of the MQConnector in S_OK if not None

Return type:

S_OK/S_ERROR

property lock

Lock to assure thread-safe access to the internal connection storage.

removeAllConnections()
Function removes all existing connections and calls the disconnect

for connectors.

Return type:

S_OK or S_ERROR

startConnection(mqURI, params, messengerType)
Function adds or updates the MQ connection. If the connection

does not exists, MQconnector is created and added.

Parameters:
  • mqURI (str)

  • params (dict) – parameters to initialize the MQConnector.

  • messengerType (str) – ‘consumer’ or ‘producer’.

Returns:

with the value of the messenger Id in S_OK.

Return type:

S_OK/S_ERROR

stopConnection(mqURI, messengerId)
Function ‘stops’ the connection for given messenger, which means

it removes it from the messenger list. If this is the consumer, the unsubscribe() connector method is called. If it is the last messenger of this destination (queue or topic), then the destination is removed. If it is the last destination from this connection. The disconnect function is called and the connection is removed.

Parameters:
  • mqURI (str)

  • messengerId (str) – e.g. ‘consumer1’ or ‘producer10’.

Returns:

with the value of the messenger Id or S_ERROR if the messenger was not added,

cause the same id already exists.

Return type:

S_OK

unsubscribe(connector, destination, messengerId)