StompInterface
- class DIRAC.Resources.MessageQueue.Simple.StompInterface.ReconnectListener(*args: Any, **kwargs: Any)
Bases:
ConnectionListener
Listener that takes care of reconnection
- __init__(connectCallback, *args)
- Parameters:
connectCallback – callback to call for reconnection
args – all the arguments to pass to the connectCallback
- on_disconnected()
Callback function called after disconnecting from broker.
- class DIRAC.Resources.MessageQueue.Simple.StompInterface.StompConsumer(host: str, port: int, username: str, password: str, destinations: list[str], connectionParams: dict | None = None, ack: str = 'auto')
Bases:
object
Class to listen to a stomp broker. It supports the use of aliases, so will create one connection per host behind the alias It will also ensure reconnection.
You can also attach multiple listener to it, but be careful to the following points:
It creates one instance of listener per connection (so per host behind the broker alias)
The ack/nack logic is left to the listener, so be careful not to compete there.
The reconnection logic is already handled by a separate listener, so do not do it yourself
Having multiple listener sharing the same connection can seem nice, but it is tricky, and you have to make sure the different Listener do not interfere with each other. That is why it is often wiser to just have separate Connections (so StompConsumer instances) for each Listener at the cost of a few extra sockets and threads.
Example on how to use:
class MyCovidListener(stomp.ConnectionListener): def __init__(self): super().__init__() self.covidDB = CovidDB() def on_message(self, frame): headers = frame.headers body = frame.body msgId = headers["message-id"] subsId = headers["subscription"] try: if headers["destination"] == "/queue/positive": self.covidDb.IncreaseCases() elif headers["destination"] == "/queue/dead": self.covidDb.DecreaseCases() self.conn.ack(msgId, subsId) except Exception: self.conn.nack(msgId, subsId) host = "myBrokerAlias.cern.ch" port = 61113 username = "myUsername" password = "IWouldLikeToBuyAHamburger" destinations = ["/queue/postive", "/queue/dead"] connectionParams = {"heartbeats": (2000, 2000)} cons = StompConsumer( host, port, username, password, destinations=destinations, connectionParams=connectionParams, ) cons.addListener(MyCovidListener) while PandemyLasts: sleep(5) conn.disconnect()
- __init__(host: str, port: int, username: str, password: str, destinations: list[str], connectionParams: dict | None = None, ack: str = 'auto')
Be careful with the
ack
parameter. This will just set theack
parameter of the ~stomp.Connection.subscribe method, but it is up to the listener to effectively ack/nack if needed.- Parameters:
host – alias of the broker
port – port to connect to
username – username to connect to the broker
password – password to connect to the broker
destinations – list of topic or queues to listen to
connectionParams – any parameters that should be passed to ~stomp.Connection
ack – see ~stomp.Connection.subscribe
- addListener(listenerCls: type[stomp.ConnectionListener]) None
Add a listener to each of the connection. Also sets the connection asa attribute to the Listener, such that the ack
- Parameters:
listenerCls – class of listener. We will instanciate one class per connection.
- disconnect()
Disconnects cleanly from the message queue server
- class DIRAC.Resources.MessageQueue.Simple.StompInterface.StompProducer(*args: Any, **kwargs: Any)
Bases:
Connection
Class to send messages to a stomp broker.
It supports the use of aliases, by randomizing the host behind the aliases and use the others as failover.
The
send
method overwrites the one from ~stomp.Connection. It uses a fixed destination given in the constructor, and ensures that there are retriesUsage example:
host = "myBrokerAlias.cern.ch" port = 61113 username = "myUsername" password = "IWouldLikeToBuyAHamburger" logRecord = {"componentname":"DataManagement/DataIntegrity", "levelname":"WARNING", "message":"Chris message"} prod = StompProducer(host, port, username, password, "/queue/lhcb.dirac.logging") prod.send(json.dumps(logRecord))
- __init__(host: str, port: int, username: str, password: str, destination: str, *args, **kwargs)
- Parameters:
host – alias of the broker
port – port to connect to
username – username to connect to the broker
password – password to connect to the broker
destination – topic or queues to which to send the message
args – given to ~stomp.Connection constructor
kwargs – given to ~stomp.Connection constructor
- send(body, **kwargs)
Overwrite the send method of ~stomp.Connection
It catches stomp exception and attempts a reconnection before giving up.
All the parameters are those from ~stomp.Protocol.send, except that we force the destination
- Returns:
True if everything went fine, False otherwise
- DIRAC.Resources.MessageQueue.Simple.StompInterface.createConsumer(mqService: str, destinations: list[str] | None = None, listenerCls: type[stomp.ConnectionListener] | None = None) DOKReturnType[T] | DErrorReturnType
Create a consumer for the given mqService
- Parameters:
mqService – name of the MQService as defined under /Resources/MQServices/
destinations – list of destinations to listen to. If not defined, take what is defined in the CS
listenerCls – if defined, given to StompConsumer.addListener
- DIRAC.Resources.MessageQueue.Simple.StompInterface.createProducer(mqService: str, destination: str | None = None) DOKReturnType[T] | DErrorReturnType
Create a Producer for the given mqService
- Parameters:
mqService – name of the MQService as defined under /Resources/MQServices/
destination – destination to send to. If not defined, take what is defined in the CS