DataRecoveryAgent
An agent to ensure consistency for transformation jobs, tasks and files.
Depending on what is the status of a job and its input and output files different actions are performed.
Warning
Before fully enabling this agent make sure that your transformation jobs fulfill the assumptions of the
agent. Otherwise it might delete some of your data! Do not set EnableFlag
to True before letting the agent run
through a few times and read the messages it produces.
Note
Use the dirac-transformation-recover-data script for checking individual transformations
The agent takes the following steps
obtain list of transformation
get a list of all ‘Failed’ and ‘Done’ jobs, jobs with pending requests are ignored.
get input files for all jobs, get the transformation file status associated for the file (Unused, Assigned, MaxReset, Processed), check if the input file exists
get the output files for each job, check if the output files exist
perform changes for Jobs, Files and Tasks: cleanup incomplete output files to obtain consistent state for jobs, tasks, input and output files
Send email about performed actions
Requirements/Assumptions:
JobParameters:
- ProductionOutputData: with the semi-colon separated list of expected output files, stored as a Job Parameter
This parameter needs to be set by the production UploadOutputData tool _before_ uploading files
JobName of the form: TransformationID_TaskID obtained as a JobAttribute
InputData from the JobMonitor.getInputData
Or Extract that information from the JDL for the job, which must also contain the ProductionOutputData fields
JobGroup equal to “%08d” % transformationID
Note
Transformations are only treated, if during the last pass changes were performed, or the number of Failed and Done jobs has changed.
DataRecoveryAgent
{
PollingTime = 3600
EnableFlag = False
MailTo =
MailFrom =
# List of TransformationIDs that will not be treated
TransformationsToIgnore =
# List of Transformation Statuses to treat
TransformationStatus = Active, Completing
# List of transformations that do not have input data, by default Operations/Transformation/ExtendableTransfTypes
TransformationsNoInput =
# List of transformations that do have input data, by default Operations/Transformation/DataProcessing (- ExtendableTransfTypes)
TransformationsWithInput =
# Print every N treated jobs to monitor progress
PrintEvery = 200
# Instead of obtaining the job information from the JobMonitor service, pick them from the JDL. This is slightly faster but requires the ProductionOutputData information to be in the JDL
JobInfoFromJDLOnly = False
}
Note
For the TransformationsNoInput
or TransformationsWithInput
to take their default value, the options need to be
removed from the configuration, otherwise no transformations of this type will be treated.
- class DIRAC.TransformationSystem.Agent.DataRecoveryAgent.DataRecoveryAgent(*args, **kwargs)
Bases:
AgentModule
Data Recovery Agent
- __init__(*args, **kwargs)
Common __init__ method for all Agents. All Agent modules must define: __doc__
They are used to populate __codeProperties
The following Options are used from the Configuration: - Status - Enabled - PollingTime default = 120 - MaxCycles default = 500 - WatchdogTime default = 0 (disabled) - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy ‘’ - shifterProxyLocation WorkDirectory/SystemName/AgentName/.shifterCred
It defines the following default Options that can be set via Configuration (above): - MonitoringEnabled True - Enabled True if Status == Active - PollingTime 120 - MaxCycles 500 - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy False - shifterProxyLocation work/SystemName/AgentName/.shifterCred
different defaults can be set in the initialize() method of the Agent using am_setOption()
In order to get a shifter proxy in the environment during the execute() the configuration Option ‘shifterProxy’ must be set, a default may be given in the initialize() method.
- am_Enabled()
- am_checkStopAgentFile()
- am_createStopAgentFile()
- am_getControlDirectory()
- am_getCyclesDone()
- am_getMaxCycles()
- am_getModuleParam(optionName)
- am_getOption(optionName, defaultValue=None)
Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.
- am_getPollingTime()
- am_getShifterProxyLocation()
- am_getStopAgentFile()
- am_getWatchdogTime()
- am_getWorkDirectory()
- am_go()
- am_initialize(*initArgs)
Common initialization for all the agents.
This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.
- am_removeStopAgentFile()
- am_secureCall(functor, args=(), name=False)
- am_setModuleParam(optionName, value)
- am_setOption(optionName, value)
- am_stopExecution()
- beginExecution()
Resets defaults after one cycle.
- checkAllJobs(jobs, tInfo, tasksDict=None, lfnTaskDict=None)
run over all jobs and do checks
- checkJob(job, tInfo)
Deal with the job.
- endExecution()
- execute()
The main execution method.
- finalize()
- getEligibleTransformations(status, typeList)
Select transformations of given status and type.
- getLFNStatus(jobs)
Get all the LFNs for the jobs and get their status.
- initialize(*args, **kwargs)
Agents should override this method for specific initialization. Executed at every agent (re)start.
- printSummary()
print summary of changes
- sendNotification(transID, transInfoDict)
Send notification email if something was modified for a transformation.
- Parameters:
transID (int) – ID of given transformation
transInfoDict
- setPendingRequests(jobs)
Loop over all the jobs and get requests, if any.
- treatTransformation(transID, transInfoDict)
Run this thing for given transformation.