DataRecoveryAgent

An agent to ensure consistency for transformation jobs, tasks and files.

Depending on what is the status of a job and its input and output files different actions are performed.

Warning

Before fully enabling this agent make sure that your transformation jobs fulfill the assumptions of the agent. Otherwise it might delete some of your data! Do not set EnableFlag to True before letting the agent run through a few times and read the messages it produces.

Note

Use the dirac-transformation-recover-data script for checking individual transformations

The agent takes the following steps

  • obtain list of transformation

  • get a list of all ‘Failed’ and ‘Done’ jobs, jobs with pending requests are ignored.

  • get input files for all jobs, get the transformation file status associated for the file (Unused, Assigned, MaxReset, Processed), check if the input file exists

  • get the output files for each job, check if the output files exist

  • perform changes for Jobs, Files and Tasks: cleanup incomplete output files to obtain consistent state for jobs, tasks, input and output files

  • Send email about performed actions

Requirements/Assumptions:

  • JobParameters:

    • ProductionOutputData: with the semi-colon separated list of expected output files, stored as a Job Parameter

      This parameter needs to be set by the production UploadOutputData tool _before_ uploading files

    • JobName of the form: TransformationID_TaskID obtained as a JobAttribute

    • InputData from the JobMonitor.getInputData

    • Or Extract that information from the JDL for the job, which must also contain the ProductionOutputData fields

  • JobGroup equal to “%08d” % transformationID

Note

Transformations are only treated, if during the last pass changes were performed, or the number of Failed and Done jobs has changed.

DataRecoveryAgent options
DataRecoveryAgent
{
  PollingTime = 3600
  EnableFlag = False
  MailTo =
  MailFrom =
  # List of TransformationIDs that will not be treated
  TransformationsToIgnore =
  # List of Transformation Statuses to treat
  TransformationStatus = Active, Completing
  # List of transformations that do not have input data, by default Operations/Transformation/ExtendableTransfTypes
  TransformationsNoInput =
  # List of transformations that do have input data, by default Operations/Transformation/DataProcessing (- ExtendableTransfTypes)
  TransformationsWithInput =
  # Print every N treated jobs to monitor progress
  PrintEvery = 200
  # Instead of obtaining the job information from the JobMonitor service, pick them from the JDL. This is slightly faster but requires the ProductionOutputData information to be in the JDL
  JobInfoFromJDLOnly = False
}

Note

For the TransformationsNoInput or TransformationsWithInput to take their default value, the options need to be removed from the configuration, otherwise no transformations of this type will be treated.

class DIRAC.TransformationSystem.Agent.DataRecoveryAgent.DataRecoveryAgent(*args, **kwargs)

Bases: AgentModule

Data Recovery Agent

__init__(*args, **kwargs)

Common __init__ method for all Agents. All Agent modules must define: __doc__

They are used to populate __codeProperties

The following Options are used from the Configuration: - /DIRAC/Setup - Status - Enabled - PollingTime default = 120 - MaxCycles default = 500 - WatchdogTime default = 0 (disabled) - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy ‘’ - shifterProxyLocation WorkDirectory/SystemName/AgentName/.shifterCred

It defines the following default Options that can be set via Configuration (above): - MonitoringEnabled True - Enabled True if Status == Active - PollingTime 120 - MaxCycles 500 - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy False - shifterProxyLocation work/SystemName/AgentName/.shifterCred

different defaults can be set in the initialize() method of the Agent using am_setOption()

In order to get a shifter proxy in the environment during the execute() the configuration Option ‘shifterProxy’ must be set, a default may be given in the initialize() method.

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)

Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.

am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)

Common initialization for all the agents.

This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.

am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()

Resets defaults after one cycle.

checkAllJobs(jobs, tInfo, tasksDict=None, lfnTaskDict=None)

run over all jobs and do checks

checkJob(job, tInfo)

Deal with the job.

endExecution()
execute()

The main execution method.

finalize()
getEligibleTransformations(status, typeList)

Select transformations of given status and type.

getLFNStatus(jobs)

Get all the LFNs for the jobs and get their status.

initialize(*args, **kwargs)

Agents should override this method for specific initialization. Executed at every agent (re)start.

printSummary()

print summary of changes

sendNotification(transID, transInfoDict)

Send notification email if something was modified for a transformation.

Parameters:
  • transID (int) – ID of given transformation

  • transInfoDict

setPendingRequests(jobs)

Loop over all the jobs and get requests, if any.

treatTransformation(transID, transInfoDict)

Run this thing for given transformation.