StompInterface

class DIRAC.Resources.MessageQueue.Simple.StompInterface.ReconnectListener(*args: Any, **kwargs: Any)

Bases: ConnectionListener

Listener that takes care of reconnection

__init__(connectCallback, *args)
Parameters:
  • connectCallback – callback to call for reconnection

  • args – all the arguments to pass to the connectCallback

on_disconnected()

Callback function called after disconnecting from broker.

class DIRAC.Resources.MessageQueue.Simple.StompInterface.StompConsumer(host: str, port: int, username: str, password: str, destinations: list[str], connectionParams: dict | None = None, ack: str = 'auto')

Bases: object

Class to listen to a stomp broker. It supports the use of aliases, so will create one connection per host behind the alias It will also ensure reconnection.

You can also attach multiple listener to it, but be careful to the following points:

  • It creates one instance of listener per connection (so per host behind the broker alias)

  • The ack/nack logic is left to the listener, so be careful not to compete there.

  • The reconnection logic is already handled by a separate listener, so do not do it yourself

Having multiple listener sharing the same connection can seem nice, but it is tricky, and you have to make sure the different Listener do not interfere with each other. That is why it is often wiser to just have separate Connections (so StompConsumer instances) for each Listener at the cost of a few extra sockets and threads.

Example on how to use:

class MyCovidListener(stomp.ConnectionListener):
    def __init__(self):
        super().__init__()
        self.covidDB = CovidDB()

    def on_message(self, frame):
        headers = frame.headers
        body = frame.body
        msgId = headers["message-id"]
        subsId = headers["subscription"]
        try:
            if headers["destination"] == "/queue/positive":
                self.covidDb.IncreaseCases()
            elif headers["destination"] == "/queue/dead":
                self.covidDb.DecreaseCases()
            self.conn.ack(msgId, subsId)
        except Exception:
            self.conn.nack(msgId, subsId)


host = "myBrokerAlias.cern.ch"
port = 61113
username = "myUsername"
password = "IWouldLikeToBuyAHamburger"


destinations = ["/queue/postive", "/queue/dead"]
connectionParams = {"heartbeats": (2000, 2000)}

cons = StompConsumer(
    host,
    port,
    username,
    password,
    destinations=destinations,
    connectionParams=connectionParams,
)

cons.addListener(MyCovidListener)

while PandemyLasts:
    sleep(5)

conn.disconnect()
__init__(host: str, port: int, username: str, password: str, destinations: list[str], connectionParams: dict | None = None, ack: str = 'auto')

Be careful with the ack parameter. This will just set the ack parameter of the ~stomp.Connection.subscribe method, but it is up to the listener to effectively ack/nack if needed.

Parameters:
  • host – alias of the broker

  • port – port to connect to

  • username – username to connect to the broker

  • password – password to connect to the broker

  • destinations – list of topic or queues to listen to

  • connectionParams – any parameters that should be passed to ~stomp.Connection

  • ack – see ~stomp.Connection.subscribe

addListener(listenerCls: type[stomp.ConnectionListener]) None

Add a listener to each of the connection. Also sets the connection asa attribute to the Listener, such that the ack

Parameters:

listenerCls – class of listener. We will instanciate one class per connection.

disconnect()

Disconnects cleanly from the message queue server

class DIRAC.Resources.MessageQueue.Simple.StompInterface.StompProducer(*args: Any, **kwargs: Any)

Bases: Connection

Class to send messages to a stomp broker.

It supports the use of aliases, by randomizing the host behind the aliases and use the others as failover.

The send method overwrites the one from ~stomp.Connection. It uses a fixed destination given in the constructor, and ensures that there are retries

Usage example:

host = "myBrokerAlias.cern.ch"
port = 61113
username = "myUsername"
password = "IWouldLikeToBuyAHamburger"

logRecord = {"componentname":"DataManagement/DataIntegrity", "levelname":"WARNING", "message":"Chris message"}

prod = StompProducer(host, port, username, password, "/queue/lhcb.dirac.logging")
prod.send(json.dumps(logRecord))
__init__(host: str, port: int, username: str, password: str, destination: str, *args, **kwargs)
Parameters:
  • host – alias of the broker

  • port – port to connect to

  • username – username to connect to the broker

  • password – password to connect to the broker

  • destination – topic or queues to which to send the message

  • args – given to ~stomp.Connection constructor

  • kwargs – given to ~stomp.Connection constructor

send(body, **kwargs)

Overwrite the send method of ~stomp.Connection

It catches stomp exception and attempts a reconnection before giving up.

All the parameters are those from ~stomp.Protocol.send, except that we force the destination

Returns:

True if everything went fine, False otherwise

DIRAC.Resources.MessageQueue.Simple.StompInterface.createConsumer(mqService: str, destinations: list[str] | None = None, listenerCls: type[stomp.ConnectionListener] | None = None) DOKReturnType[T] | DErrorReturnType

Create a consumer for the given mqService

Parameters:
  • mqService – name of the MQService as defined under /Resources/MQServices/

  • destinations – list of destinations to listen to. If not defined, take what is defined in the CS

  • listenerCls – if defined, given to StompConsumer.addListener

DIRAC.Resources.MessageQueue.Simple.StompInterface.createProducer(mqService: str, destination: str | None = None) DOKReturnType[T] | DErrorReturnType

Create a Producer for the given mqService

Parameters:
  • mqService – name of the MQService as defined under /Resources/MQServices/

  • destination – destination to send to. If not defined, take what is defined in the CS

DIRAC.Resources.MessageQueue.Simple.StompInterface.getSubscriptionID(broker: tuple[str, int], dest: str) str

Generate a unique subscribtionID based on the broker host, port and destination

Parameters:
  • broker – tuple (host,port) to which we connect

  • dest – name of the destination (topic or queue)