DISET Stable connections

DISET is the communication, authorization and authentication framework of top of which DIRAC services are built. Traditionally DISET offered RPC and file transfer capabilities. Those communication mechanisms are not well suited for the Executor framework. RPC doesn’t allow the server to send data to the clients asynchronously, and each RPC query requires establishing a new connection and going through another SSL handshake. On average the SSL process is the most resource consuming part of the request.

stable connections diagram

The Executor framework relies on a new DISET capability. Support for stable connections and asynchronous requests has been added. Any component can open a connection and reuse it to send and receive requests though it. Services can send information to clients without having to wait for the clients to ask for them as shown in the stable connections figure.

Although once connected services can send data asynchronously to clients, services are still servers and require clients to start the connection to them. No service can start the connection towards the client. Once the service has received the connection the asynchonous data transfer can take place.

Server side usage

Any DIRAC service can make use of the stable connection mechanism. It’s usage is quite similar to the usual RPC mechanism but with extended capabilities. Here we have an example of a service using the stable connections mechanism:

 1""" This is a simple example of implementation of a Ping/Pong service for executors.
 3    This service does not any specific configuration to run, only the Port number, and authz e.g.:
 5    {
 6      Port = 9145
 7      {
 8        Authorization
 9        {
10          Default = all
11        }
12      }
13    }
15from DIRAC import S_OK
16from DIRAC.Core.DISET.RequestHandler import RequestHandler
19class PingPongHandler(RequestHandler):
20    MSG_DEFINITIONS = {"Ping": {"id": int}, "Pong": {"id": int}}
22    auth_conn_connected = ["all"]
24    def conn_connected(self, trid, identity, kwargs):
25        """
26        This function will be called when a new client connects.
27        It is not mandatory to have this function
29        params:
30          @trid: Transport ID: Unique for each connection
31          @identity: Unique for each client even if it reconnects
32          @kwargs: Arguments sent by the client for the connection
33        """
34        # Do something with trid/identity/kwargs if needed
35        return S_OK()
37    auth_conn_drop = ["all"]
39    def conn_drop(self, trid):
40        """
41        This function will be called when a client disconnects.
42        It is not mandatory to have this function
43        """
44        return S_OK()
46    auth_msg_Ping = ["all"]
48    def msg_Ping(self, msgObj):
49        """
50        Callback for Ping message
51        """
52        pingid = msgObj.id
53        result = self.srv_msgCreate("Pong")
54        if not result["OK"]:
55            # Something went wrong :P
56            return result
57        pongObj = result["Value"]
58        pongObj.id = pingid
59        # Could have been
60        # return self.srv_msgReply( pongObj )
61        return self.srv_msgSend(self.srv_getTransportID(), pongObj)

The first thing the server requires is a definition of the messages that it can use. In the example, lines 7 and 8 define two messages: Ping and Pong messages. Each message has one attribute called id that can only be either an integer or a long. Lines 10-22 define the connection callback conn_connected. Whenever the client receives a new client connection this function will be called. This function receives three parameters:


Transport identifier. Each client connection will have a unique id. If a client reconnects it will have a different trid each time.


Client identifier. Each client will have a unique id. This id will be maintained across reconnects.


Dictionary containing keyword arguments sent by client when connecting.

If this function doesn’t return S_OK the client connection will be rejected.

If a client drops the connection, method conn_drop will be called with the trid of the disconnected client to allow the handler to clean up it’s state regarding that client if necessary.

Lines 32-46 define callback for Ping message. All message callbacks will receive only one parameter. The parameter will be an object containing the message data. As seen in line 37, the message object will have defined the attributes previously defined with the values the client is sending. Accessing them is as easy as just accessing normal attributes. On line 38 the Pong message is created and then assigned a value in to the id attribute on line 43. Finally the message is sent back to the client using srv_msgSend with the client trid as first parameter and the Pong message as second one. To just reply to a client there’s a shortcut function srv_msgReply. If any message callback doesn’t return S_OK the client will be disconnected.

In the example there’s no callback for the Pong message because not all services may have to react to all messages. Some messages will only make sense to be sent to clients not received from them. If the Service receives the Pong message, it will send an error back to the client and disconnect it.

Client side usage

Clients do not have to define which messages they can use. The Message client will automatically discover those based on the service to which they are connecting. Here’s an example on how a client could look like:

 1import sys
 2import time
 3from DIRAC import S_ERROR, initialize
 4from DIRAC.Core.DISET.MessageClient import MessageClient
 9def sendPingMsg(msgClient, pingid=0):
10    """
11    Send Ping message to the server
12    """
13    result = msgClient.createMessage("Ping")
14    if not result["OK"]:
15        return result
16    msgObj = result["Value"]
17    msgObj.id = pingid
18    return msgClient.sendMessage(msgObj)
21def pongCB(msgObj):
22    """
23    Callback for the Pong message.
24    Just send a Ping message incrementing in 1 the id
25    """
26    pongid = msgObj.id
27    print("RECEIVED PONG %d" % pongid)
28    return sendPingMsg(msgObj.msgClient, pongid + 1)
31def disconnectedCB(msgClient):
32    """
33    Reconnect :)
34    """
35    retryCount = 0
36    while retryCount:
37        result = msgClient.connect()
38        if result["OK"]:
39            return result
40        time.sleep(1)
41        retryCount -= 1
42    return S_ERROR("Could not reconnect... :P")
45if __name__ == "__main__":
46    msgClient = MessageClient("Framework/PingPong")
47    msgClient.subscribeToMessage("Pong", pongCB)
48    msgClient.subscribeToDisconnect(disconnectedCB)
49    result = msgClient.connect()
50    if not result["OK"]:
51        print(f"CANNOT CONNECT: {result['Message']}")
52        sys.exit(1)
53    result = sendPingMsg(msgClient)
54    if not result["OK"]:
55        print(f"CANNOT SEND PING: {result['Message']}")
56        sys.exit(1)
57    # Wait 10 secs of pingpongs :P
58    time.sleep(10)

Let’s start with like 39 onwards. The client app is instancing a MessageClient pointing to the desired service. After that it registers all the callbacks it needs. One for receiving Pong messages and one for reacting to disconnects. After that it just connects to the server and sends the first Ping message. Lastly it will just wait 10 seconds before exiting.

Function sendPingMsg in line 5 onwards just creates a Ping message and sends it to the server via the supplied msgClient.

The pongCB function will be executed for each Pong message received. Messages received on the client callbacks have a special attribute msgClient with the client that has received the message. If this attribute is accessed in services it will just return None.

Function disconnectedCB will be invoked if the client is disconnected from the service. In the example it will just try to reconnect for some time and then exit if it doesn’t manage to do so.