DISET Stable connections

DISET is the communication, authorization and authentication framework of top of which DIRAC services are built. Traditionally DISET offered RPC and file transfer capabilities. Those communication mechanisms are not well suited for the Executor framework. RPC doesn’t allow the server to send data to the clients asynchronously, and each RPC query requires establishing a new connection and going through another SSL handshake. On average the SSL process is the most resource consuming part of the request.

stable connections diagram

The Executor framework relies on a new DISET capability. Support for stable connections and asynchronous requests has been added. Any component can open a connection and reuse it to send and receive requests though it. Services can send information to clients without having to wait for the clients to ask for them as shown in the stable connections figure.

Although once connected services can send data asynchronously to clients, services are still servers and require clients to start the connection to them. No service can start the connection towards the client. Once the service has received the connection the asynchonous data transfer can take place.

Server side usage

Any DIRAC service can make use of the stable connection mechanism. It’s usage is quite similar to the usual RPC mechanism but with extended capabilities. Here we have an example of a service using the stable connections mechanism:

 1""" This is a simple example of implementation of a Ping/Pong service for executors.
 3    This service does not any specific configuration to run, only the Port number, and authz e.g.:
 5    {
 6      Port = 9145
 7      {
 8        Authorization
 9        {
10          Default = all
11        }
12      }
13    }
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
19import six
20from DIRAC import S_OK
21from DIRAC.Core.DISET.RequestHandler import RequestHandler
24class PingPongHandler(RequestHandler):
26    MSG_DEFINITIONS = {"Ping": {"id": six.integer_types}, "Pong": {"id": six.integer_types}}
28    auth_conn_connected = ["all"]
30    def conn_connected(self, trid, identity, kwargs):
31        """
32        This function will be called when a new client connects.
33        It is not mandatory to have this function
35        params:
36          @trid: Transport ID: Unique for each connection
37          @identity: Unique for each client even if it reconnects
38          @kwargs: Arguments sent by the client for the connection
39        """
40        # Do something with trid/identity/kwargs if needed
41        return S_OK()
43    auth_conn_drop = ["all"]
45    def conn_drop(self, trid):
46        """
47        This function will be called when a client disconnects.
48        It is not mandatory to have this function
49        """
50        return S_OK()
52    auth_msg_Ping = ["all"]
54    def msg_Ping(self, msgObj):
55        """
56        Callback for Ping message
57        """
58        pingid = msgObj.id
59        result = self.srv_msgCreate("Pong")
60        if not result["OK"]:
61            # Something went wrong :P
62            return result
63        pongObj = result["Value"]
64        pongObj.id = pingid
65        # Could have been
66        # return self.srv_msgReply( pongObj )
67        return self.srv_msgSend(self.srv_getTransportID(), pongObj)

The first thing the server requires is a definition of the messages that it can use. In the example, lines 7 and 8 define two messages: Ping and Pong messages. Each message has one attribute called id that can only be either an integer or a long. Lines 10-22 define the connection callback conn_connected. Whenever the client receives a new client connection this function will be called. This function receives three parameters:


Transport identifier. Each client connection will have a unique id. If a client reconnects it will have a different trid each time.


Client identifier. Each client will have a unique id. This id will be maintained across reconnects.


Dictionary containing keyword arguments sent by client when connecting.

If this function doesn’t return S_OK the client connection will be rejected.

If a client drops the connection, method conn_drop will be called with the trid of the disconnected client to allow the handler to clean up it’s state regarding that client if necessary.

Lines 32-46 define callback for Ping message. All message callbacks will receive only one parameter. The parameter will be an object containing the message data. As seen in line 37, the message object will have defined the attributes previously defined with the values the client is sending. Accessing them is as easy as just accessing normal attributes. On line 38 the Pong message is created and then assigned a value in to the id attribute on line 43. Finally the message is sent back to the client using srv_msgSend with the client trid as first parameter and the Pong message as second one. To just reply to a client there’s a shortcut function srv_msgReply. If any message callback doesn’t return S_OK the client will be disconnected.

In the example there’s no callback for the Pong message because not all services may have to react to all messages. Some messages will only make sense to be sent to clients not received from them. If the Service receives the Pong message, it will send an error back to the client and disconnect it.

Client side usage

Clients do not have to define which messages they can use. The Message client will automatically discover those based on the service to which they are connecting. Here’s an example on how a client could look like:

 1from __future__ import print_function
 2from __future__ import absolute_import
 3from __future__ import division
 4import sys
 5import time
 6from DIRAC import S_OK, S_ERROR
 7from DIRAC.Core.Base import Script
 8from DIRAC.Core.DISET.MessageClient import MessageClient
13def sendPingMsg(msgClient, pingid=0):
14    """
15    Send Ping message to the server
16    """
17    result = msgClient.createMessage("Ping")
18    if not result["OK"]:
19        return result
20    msgObj = result["Value"]
21    msgObj.id = pingid
22    return msgClient.sendMessage(msgObj)
25def pongCB(msgObj):
26    """
27    Callback for the Pong message.
28    Just send a Ping message incrementing in 1 the id
29    """
30    pongid = msgObj.id
31    print("RECEIVED PONG %d" % pongid)
32    return sendPingMsg(msgObj.msgClient, pongid + 1)
35def disconnectedCB(msgClient):
36    """
37    Reconnect :)
38    """
39    retryCount = 0
40    while retryCount:
41        result = msgClient.connect()
42        if result["OK"]:
43            return result
44        time.sleep(1)
45        retryCount -= 1
46    return S_ERROR("Could not reconnect... :P")
49if __name__ == "__main__":
50    msgClient = MessageClient("Framework/PingPong")
51    msgClient.subscribeToMessage("Pong", pongCB)
52    msgClient.subscribeToDisconnect(disconnectedCB)
53    result = msgClient.connect()
54    if not result["OK"]:
55        print("CANNOT CONNECT: %s" % result["Message"])
56        sys.exit(1)
57    result = sendPingMsg(msgClient)
58    if not result["OK"]:
59        print("CANNOT SEND PING: %s" % result["Message"])
60        sys.exit(1)
61    # Wait 10 secs of pingpongs :P
62    time.sleep(10)

Let’s start with like 39 onwards. The client app is instancing a MessageClient pointing to the desired service. After that it registers all the callbacks it needs. One for receiving Pong messages and one for reacting to disconnects. After that it just connects to the server and sends the first Ping message. Lastly it will just wait 10 seconds before exiting.

Function sendPingMsg in line 5 onwards just creates a Ping message and sends it to the server via the supplied msgClient.

The pongCB function will be executed for each Pong message received. Messages received on the client callbacks have a special attribute msgClient with the client that has received the message. If this attribute is accessed in services it will just return None.

Function disconnectedCB will be invoked if the client is disconnected from the service. In the example it will just try to reconnect for some time and then exit if it doesn’t manage to do so.