StompMQConnector

Class for management of Stomp MQ connections, e.g. RabbitMQ

class DIRAC.Resources.MessageQueue.StompMQConnector.StompListener(*args: Any, **kwargs: Any)

Bases: ConnectionListener

Internal listener class responsible for handling new messages and errors.

__init__(callback, ack, connection, messengerId, connectCallback)

Initializes the internal listener object

Parameters:
  • callback – a defaultCallback compatible function.

  • ack (bool) – if set to true an acknowledgement will be send back to the sender.

  • messengerId (str) – messenger identifier sent with acknowledgement messages.

  • connectCallback – the connect method to call in case of disconnection

on_disconnected()

Callback function called after disconnecting from broker.

on_error(headers, message)

Function called when an error happens

Parameters:
  • headers (dict) – message headers.

  • body (json) – message body.

on_message(headers, body)

Function called upon receiving a message

Parameters:
  • headers (dict) – message headers

  • body (json) – message body

class DIRAC.Resources.MessageQueue.StompMQConnector.StompMQConnector(parameters=None)

Bases: MQConnector

Class for management of message queue connections Allows to both send and receive messages from a queue

When several IPs are behind an alias, we shuffle the ips, and connect to one. The others are used as failover by stomp’s internals

INCOMING_HEARTBEAT_MS = 15000
OUTGOING_HEARTBEAT_MS = 15000
PORT = 61613
RECONNECT_ATTEMPTS_MAX = 10000.0
RECONNECT_SLEEP_INCREASE = 0.5
RECONNECT_SLEEP_INITIAL = 1
RECONNECT_SLEEP_JITTER = 0.1
RECONNECT_SLEEP_MAX = 120
STOMP_TIMEOUT = 60
__init__(parameters=None)

Standard constructor

connect(parameters=None)

Call the ~stomp.Connection.connect method for each endpoint

Parameters:

parameters – connection parameter

disconnect(parameters=None)

Disconnects from the message queue server

put(message, parameters=None)

Sends a message to the queue message contains the body of the message

Parameters:
  • message (str) – string or any json encodable structure.

  • parameters (dict) – parameters with ‘destination’ key defined.

reconnect()

Callback method when a disconnection happens

Parameters:

serverIP – IP of the server disconnected

setupConnection(parameters=None)

Establishes a new connection to a Stomp server, e.g. RabbitMQ

Parameters:

parameters (dict) – dictionary with additional MQ parameters if any.

Returns:

S_OK/S_ERROR

subscribe(parameters=None)

Subscribes to the message queue server

Parameters:

parameters (dict) – dictionary with additional parameters if any

Returns:

S_OK/S_ERROR

unsubscribe(parameters)

Subscribes to the message queue server

Parameters:

parameters (dict) – dictionary with additional parameters if any

Returns:

S_OK/S_ERROR