StompMQConnector

Class for management of Stomp MQ connections, e.g. RabbitMQ

class DIRAC.Resources.MessageQueue.StompMQConnector.ReconnectListener(callback=None, serverIP=None)

Bases: sphinx.ext.autodoc.importer._MockObject

Internal listener class responsible for reconnecting in case of disconnection.

__init__(callback=None, serverIP=None)

Initializes the internal listener object

Parameters:
  • callback – a function called when disconnection happens.
  • serverIP – IP address of the server in question
on_disconnected()

Callback function called after disconnecting from broker.

class DIRAC.Resources.MessageQueue.StompMQConnector.StompListener(callback, ack, connection, messengerId)

Bases: sphinx.ext.autodoc.importer._MockObject

Internal listener class responsible for handling new messages and errors.

__init__(callback, ack, connection, messengerId)

Initializes the internal listener object

Parameters:
  • callback – a defaultCallback compatible function.
  • ack (bool) – if set to true an acknowledgement will be send back to the sender.
  • messengerId (str) – messenger identifier sent with acknowledgement messages.
on_error(headers, message)

Function called when an error happens

Parameters:
  • headers (dict) – message headers.
  • body (json) – message body.
on_message(headers, body)

Function called upon receiving a message

Parameters:
  • headers (dict) – message headers
  • body (json) – message body
class DIRAC.Resources.MessageQueue.StompMQConnector.StompMQConnector(parameters=None)

Bases: DIRAC.Resources.MessageQueue.MQConnector.MQConnector

Class for management of message queue connections Allows to both send and receive messages from a queue

PORT = 61613
RECONNECT_ATTEMPTS_MAX = 10000.0
RECONNECT_SLEEP_INCREASE = 0.5
RECONNECT_SLEEP_INITIAL = 1
RECONNECT_SLEEP_JITTER = 0.1
RECONNECT_SLEEP_MAX = 120
__init__(parameters=None)

Standard constructor

connect(parameters=None, serverIP=None)

Call the ~stomp.Connection.connect method for each endpoint

Parameters:
  • parameters – connection parameter
  • serverIP – If None, connect all the endpoints. Otherwise, only the one matching this specific IP
disconnect(parameters=None)

Disconnects from the message queue server

put(message, parameters=None)

Sends a message to the queue message contains the body of the message

Parameters:
  • message (str) – string or any json encodable structure.
  • parameters (dict) – parameters with ‘destination’ key defined.
reconnect(serverIP)

Callback method when a disconnection happens

Parameters:serverIP – IP of the server disconnected
setupConnection(parameters=None)
Establishes a new connection to a Stomp server, e.g. RabbitMQ
Parameters:parameters (dict) – dictionary with additional MQ parameters if any.
Returns:S_OK/S_ERROR
subscribe(parameters=None)

Subscribes to the message queue server

Parameters:parameters (dict) – dictionary with additional parameters if any
Returns:S_OK/S_ERROR
unsubscribe(parameters)

Subscribes to the message queue server

Parameters:parameters (dict) – dictionary with additional parameters if any
Returns:S_OK/S_ERROR