WorkflowTaskAgent

The Workflow Task Agent takes workflow tasks created in the transformation database and submits to the workload management system.

The WorkflowTaskAgent takes workflow tasks created in the TransformationDB and submits them to the WMS. Since version v6r13 there are some new capabilities in the form of TaskManager plugins.

The following options can be set for the WorkflowTaskAgent.

WorkflowTaskAgent
{
  # Use a dedicated proxy to submit jobs to the WMS
  shifterProxy = ProductionManager
  # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r20p5)
  ShifterCredentials =
  # Transformation types to be taken into account by the agent
  TransType = MCSimulation,DataReconstruction,DataStripping,MCStripping,Merge
  # Location of the transformation plugins
  PluginLocation = DIRAC.TransformationSystem.Client.TaskManagerPlugin
  # maximum number of threads to use in this agent
  maxNumberOfThreads = 15

  # Give this option a value if the agent should submit Requests
  SubmitTasks = yes
  # Status of transformations for which to submit Requests
  SubmitStatus = Active,Completing
  # Number of tasks to submit in one execution cycle per transformation
  TasksPerLoop = 50

  # Give this option a value if the agent should monitor tasks
  MonitorTasks = yes
  # Status of transformations for which to monitor tasks
  UpdateTasksStatus = Active,Completing,Stopped
  # Task statuses considered transient that should be monitored for updates
  TaskUpdateStatus = Submitted,Received,Waiting,Running,Matched,Completed,Failed
  # Number of tasks to be updated in one call
  TaskUpdateChunkSize = 0

  # Give this option a value if the agent should monitor files
  MonitorFiles =
  # Status of transformations for which to monitor Files
  UpdateFilesStatus = Active,Completing,Stopped

  # Give this option a value if the agent should check Reserved tasks
  CheckReserved =
  # Status of transformations for which to check reserved tasks
  CheckReservedStatus = Active,Completing,Stopped

  # Fill in this option if you want to activate bulk submission (for speed up)
  BulkSubmission = yes

}
  • The options SubmitTasks, MonitorTasks, MonitorFiles, and CheckReserved need to be assigned any non-empty value to be activated

  • New in version v6r20p5: It is possible to run the RequestTaskAgent without a shifterProxy or ShifterCredentials, in this case the credentials of the authors of the transformations are used to submit the jobs to the RMS. This enables the use of a single RequestTaskAgent for multiple VOs. See also the section about the Multi VO Configuration.

class DIRAC.TransformationSystem.Agent.WorkflowTaskAgent.WorkflowTaskAgent(*args, **kwargs)

Bases: DIRAC.TransformationSystem.Agent.TaskManagerAgentBase.TaskManagerAgentBase

An AgentModule class to submit workflow tasks

__init__(*args, **kwargs)

c’tor

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_disableMonitoring()
am_getBasePath()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)
am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)
am_monitoringEnabled()
am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()
checkReservedTasks(transIDOPBody, clients)

Checking Reserved tasks

endExecution()
execute()

The TaskManagerBase execution method is just filling the Queues of transformations that need to be processed

finalize()

graceful finalization

initialize()

Standard initialize method

submitTasks(transIDOPBody, clients)

Submit the tasks to an external system, using the taskManager provided

Parameters:
  • transIDOPBody (dict) – transformation body
  • clients (dict) – dictionary of client objects
Returns:

S_OK/S_ERROR

updateFileStatus(transIDOPBody, clients)

Update the files status

updateTaskStatus(transIDOPBody, clients)

Updates the task status