Create a Body Plugin

This page briefly explains the steps necessary to add a new BodyPlugin or an extension. It assumes the reader has some form of a development and testing setup available to them.

The module DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody contains documentation on how to code such a plugin

The ReplicateOrMove Plugin Example

Imagine a world where you would have storages that are reliable, and some that are less. And imagine that you are creating a transformation to shuffle data around. Now, if the destination of your file is a trustworthy one, you could imagine deleting the original copy. But if the destination is likely to lose your file, you are better off keeping a second copy. That’s what our example plugin will do.

ReplicateOrMoveBody.py
 1 from DIRAC import gLogger
 2 from DIRAC.RequestManagementSystem.Client.Request import Request
 3 from DIRAC.RequestManagementSystem.Client.Operation import Operation
 4 from DIRAC.RequestManagementSystem.Client.File import File
 5
 6 from DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody import BaseBody
 7
 8
 9 sLog = gLogger.getLocalSubLogger(__name__)
10
11
12 class ReplicateOrMoveBody(BaseBody):
13     """
14     If the destination storage is trustworthy, move the file there,
15     otherwise, just do a copy of the file
16     """
17
18     # This is needed to know how to serialize the object,
19     # all attributes that need to be persisted should be in this list.
20     # See :py:class:`~DIRAC.Core.Utilities.JEncode.JSerializable` for more details.
21     _attrToSerialize = ["trustworthy", "undependable"]
22
23     def __init__(self, trustworthy=None, undependable=None):
24         """C'tor
25
26         :param list trustworthy: trustworthy storage to which we will move the file
27         :param list undependable: doubtful storage to which we will rather just copy
28         """
29         self.trustworthy = trustworthy
30         self.undependable = undependable
31
32     def taskToRequest(self, taskID, task, transID):
33         """
34         Convert a task into a Request with either a ReplicateAndRegister
35         or a Move Operation
36         """
37         req = Request()
38         op = Operation()
39
40         # Decide on an operation type
41         targetSE = task["TargetSE"]
42         if targetSE in self.trustworthy:
43             op.Type = "MoveReplica"
44         elif targetSE in self.undependable:
45             op.Type = "ReplicateAndRegister"
46         else:
47             sLog.warn("Hum, SE is not in a known list... let's try to assess the quality of the storage...")
48             # VERY doubtful storage
49             if "RAL" in targetSE:
50                 op.Type = "ReplicateAndRegister"
51             else:
52                 op.Type = "MoveReplica"
53
54         for lfn in task["InputData"]:
55             trFile = File()
56             trFile.LFN = lfn
57
58             op.addFile(trFile)
59
60         req.addOperation(op)
61
62         return req

Using the Body Plugin

When creating a transformation, just create the BodyPlugin object you want with the appropriate parameters, and set it using setBody

createReplicateOrMove.py
 1 from DIRAC import gLogger, S_OK, S_ERROR
 2 from DIRAC.Core.Utilities.DIRACScript import DIRACScript as Script
 3
 4 Script.parseCommandLine()
 5
 6 from DIRAC.TransformationSystem.Client.Transformation import Transformation
 7 from DIRAC.TransformationSystem.Client.BodyPlugin.ReplicateOrMoveBody import (
 8     ReplicateOrMoveBody,
 9 )
10
11 myTrans = Transformation()
12 uniqueIdentifier = "sensitiveData"
13 myTrans.setTransformationName("ReplicateOrMove_%s" % uniqueIdentifier)
14 myTrans.setDescription("Move only to trustworthy storages")
15 myTrans.setType("Replication")
16 myTrans.setTransformationGroup("MyGroup")
17 myTrans.setGroupSize(2)
18
19 # Set the Broadcast plugin
20 myTrans.setPlugin("Broadcast")
21 myTrans.Destinations(1)
22
23
24 myBody = ReplicateOrMoveBody(
25     trustworthy=["CERN-Storage", "CNAF-Storage"], undependable=["NIPNE-Storage"]
26 )
27
28 myTrans.setBody(myBody)
29
30 metadata = {"TransformationID": 2}
31 myTrans.setInputMetaQuery(metadata)
32
33 res = myTrans.addTransformation()
34 if not res["OK"]:
35     gLogger.error("Failed to add the transformation: %s" % res["Message"])
36     exit(1)
37
38 # now activate the transformation
39 myTrans.setStatus("Active")
40 myTrans.setAgentType("Automatic")
41 transID = myTrans.getTransformationID()["Value"]
42
43 gLogger.notice("Created ReplicateOrMove transformation: %r" % transID)
44 exit(0)