Create a Body Plugin

This page briefly explains the steps necessary to add a new BodyPlugin or an extension. It assumes the reader has some form of a development and testing setup available to them.

The module DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody contains documentation on how to code such a plugin

The ReplicateOrMove Plugin Example

Imagine a world where you would have storages that are reliable, and some that are less. And imagine that you are creating a transformation to shuffle data around. Now, if the destination of your file is a trustworthy one, you could imagine deleting the original copy. But if the destination is likely to lose your file, you are better off keeping a second copy. That’s what our example plugin will do.

ReplicateOrMoveBody.py
 1 from DIRAC import gLogger
 2 from DIRAC.RequestManagementSystem.Client.Request import Request
 3 from DIRAC.RequestManagementSystem.Client.Operation import Operation
 4 from DIRAC.RequestManagementSystem.Client.File import File
 5
 6 from DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody import BaseBody
 7
 8
 9 sLog = gLogger.getLocalSubLogger(__name__)
10
11
12 class ReplicateOrMoveBody(BaseBody):
13     """
14     If the destination storage is trustworthy, move the file there,
15     otherwise, just do a copy of the file
16     """
17
18     # This is needed to know how to serialize the object,
19     # all attributes that need to be persisted should be in this list.
20     # See :py:class:`~DIRAC.Core.Utilities.JEncode.JSerializable` for more details.
21     _attrToSerialize = ["trustworthy", "undependable"]
22
23     def __init__(self, trustworthy=None, undependable=None):
24         """C'tor
25
26         :param list trustworthy: trustworthy storage to which we will move the file
27         :param list undependable: doubtful storage to which we will rather just copy
28         """
29         self.trustworthy = trustworthy
30         self.undependable = undependable
31
32     def taskToRequest(self, taskID, task, transID):
33         """
34         Convert a task into a Request with either a ReplicateAndRegister
35         or a Move Operation
36         """
37         req = Request()
38         op = Operation()
39
40         # Decide on an operation type
41         targetSE = task["TargetSE"]
42         if targetSE in self.trustworthy:
43             op.Type = "MoveReplica"
44         elif targetSE in self.undependable:
45             op.Type = "ReplicateAndRegister"
46         else:
47             sLog.warn("Hum, SE is not in a known list... let's try to assess the quality of the storage...")
48             # VERY doubtful storage
49             if "RAL" in targetSE:
50                 op.Type = "ReplicateAndRegister"
51             else:
52                 op.Type = "MoveReplica"
53
54         for lfn in task["InputData"]:
55             trFile = File()
56             trFile.LFN = lfn
57
58             op.addFile(trFile)
59
60         req.addOperation(op)
61
62         return req

Using the Body Plugin

When creating a transformation, just create the BodyPlugin object you want with the appropriate parameters, and set it using setBody

createReplicateOrMove.py
 1 from DIRAC import gLogger, initialize
 2
 3 initialize()
 4
 5 from DIRAC.TransformationSystem.Client.Transformation import Transformation
 6 from DIRAC.TransformationSystem.Client.BodyPlugin.ReplicateOrMoveBody import (
 7     ReplicateOrMoveBody,
 8 )
 9
10 myTrans = Transformation()
11 uniqueIdentifier = "sensitiveData"
12 myTrans.setTransformationName("ReplicateOrMove_%s" % uniqueIdentifier)
13 myTrans.setDescription("Move only to trustworthy storages")
14 myTrans.setType("Replication")
15 myTrans.setTransformationGroup("MyGroup")
16 myTrans.setGroupSize(2)
17
18 # Set the Broadcast plugin
19 myTrans.setPlugin("Broadcast")
20 myTrans.Destinations(1)
21
22
23 myBody = ReplicateOrMoveBody(
24     trustworthy=["CERN-Storage", "CNAF-Storage"], undependable=["NIPNE-Storage"]
25 )
26
27 myTrans.setBody(myBody)
28
29 metadata = {"TransformationID": 2}
30 myTrans.setInputMetaQuery(metadata)
31
32 res = myTrans.addTransformation()
33 if not res["OK"]:
34     gLogger.error("Failed to add the transformation: %s" % res["Message"])
35     exit(1)
36
37 # now activate the transformation
38 myTrans.setStatus("Active")
39 myTrans.setAgentType("Automatic")
40 transID = myTrans.getTransformationID()["Value"]
41
42 gLogger.notice("Created ReplicateOrMove transformation: %r" % transID)
43 exit(0)