Create a Body Plugin
This page briefly explains the steps necessary to add a new BodyPlugin or an extension. It assumes the reader has some form of a development and testing setup available to them.
The module DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody
contains documentation on how to code such a plugin
The ReplicateOrMove Plugin Example
Imagine a world where you would have storages that are reliable, and some that are less. And imagine that you are creating a transformation to shuffle data around. Now, if the destination of your file is a trustworthy one, you could imagine deleting the original copy. But if the destination is likely to lose your file, you are better off keeping a second copy. That’s what our example plugin will do.
1 from DIRAC import gLogger
2 from DIRAC.RequestManagementSystem.Client.Request import Request
3 from DIRAC.RequestManagementSystem.Client.Operation import Operation
4 from DIRAC.RequestManagementSystem.Client.File import File
5
6 from DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody import BaseBody
7
8
9 sLog = gLogger.getLocalSubLogger(__name__)
10
11
12 class ReplicateOrMoveBody(BaseBody):
13 """
14 If the destination storage is trustworthy, move the file there,
15 otherwise, just do a copy of the file
16 """
17
18 # This is needed to know how to serialize the object,
19 # all attributes that need to be persisted should be in this list.
20 # See :py:class:`~DIRAC.Core.Utilities.JEncode.JSerializable` for more details.
21 _attrToSerialize = ["trustworthy", "undependable"]
22
23 def __init__(self, trustworthy=None, undependable=None):
24 """C'tor
25
26 :param list trustworthy: trustworthy storage to which we will move the file
27 :param list undependable: doubtful storage to which we will rather just copy
28 """
29 self.trustworthy = trustworthy
30 self.undependable = undependable
31
32 def taskToRequest(self, taskID, task, transID):
33 """
34 Convert a task into a Request with either a ReplicateAndRegister
35 or a Move Operation
36 """
37 req = Request()
38 op = Operation()
39
40 # Decide on an operation type
41 targetSE = task["TargetSE"]
42 if targetSE in self.trustworthy:
43 op.Type = "MoveReplica"
44 elif targetSE in self.undependable:
45 op.Type = "ReplicateAndRegister"
46 else:
47 sLog.warn("Hum, SE is not in a known list... let's try to assess the quality of the storage...")
48 # VERY doubtful storage
49 if "RAL" in targetSE:
50 op.Type = "ReplicateAndRegister"
51 else:
52 op.Type = "MoveReplica"
53
54 for lfn in task["InputData"]:
55 trFile = File()
56 trFile.LFN = lfn
57
58 op.addFile(trFile)
59
60 req.addOperation(op)
61
62 return req
Using the Body Plugin
When creating a transformation, just create the BodyPlugin object you want with the appropriate parameters, and set it using setBody
1 from DIRAC import gLogger, initialize
2
3 initialize()
4
5 from DIRAC.TransformationSystem.Client.Transformation import Transformation
6 from DIRAC.TransformationSystem.Client.BodyPlugin.ReplicateOrMoveBody import (
7 ReplicateOrMoveBody,
8 )
9
10 myTrans = Transformation()
11 uniqueIdentifier = "sensitiveData"
12 myTrans.setTransformationName("ReplicateOrMove_%s" % uniqueIdentifier)
13 myTrans.setDescription("Move only to trustworthy storages")
14 myTrans.setType("Replication")
15 myTrans.setTransformationGroup("MyGroup")
16 myTrans.setGroupSize(2)
17
18 # Set the Broadcast plugin
19 myTrans.setPlugin("Broadcast")
20 myTrans.Destinations(1)
21
22
23 myBody = ReplicateOrMoveBody(
24 trustworthy=["CERN-Storage", "CNAF-Storage"], undependable=["NIPNE-Storage"]
25 )
26
27 myTrans.setBody(myBody)
28
29 metadata = {"TransformationID": 2}
30 myTrans.setInputMetaQuery(metadata)
31
32 res = myTrans.addTransformation()
33 if not res["OK"]:
34 gLogger.error("Failed to add the transformation: %s" % res["Message"])
35 exit(1)
36
37 # now activate the transformation
38 myTrans.setStatus("Active")
39 myTrans.setAgentType("Automatic")
40 transID = myTrans.getTransformationID()["Value"]
41
42 gLogger.notice("Created ReplicateOrMove transformation: %r" % transID)
43 exit(0)