WorkflowTaskAgent

The WorkflowTaskAgent takes workflow tasks created in the TransformationDB and submits them to the WMS.

The following options can be set for the WorkflowTaskAgent.

WorkflowTaskAgent options
WorkflowTaskAgent
{
  # Transformation types to be taken into account by the agent
  TransType = MCSimulation,DataReconstruction,DataStripping,MCStripping,Merge

  # Task statuses considered transient that should be monitored for updates
  TaskUpdateStatus = Submitted,Received,Waiting,Running,Matched,Completed,Failed

  # Status of transformations for which to monitor tasks
  UpdateTasksStatus = Active,Completing,Stopped

  # Number of tasks to be updated in one call
  TaskUpdateChunkSize = 0

  # Give this option a value if the agent should submit workflow tasks (Jobs)
  SubmitTasks = yes

  # Status of transformations for which to submit jobs to WMS
  SubmitStatus = Active,Completing

  # Number of tasks to submit in one execution cycle per transformation
  TasksPerLoop = 50

  # Use a dedicated proxy to submit jobs to the WMS
  shifterProxy =

  # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r20p5)
  ShifterCredentials =

  # Flag for checking reserved tasks that failed submission
  CheckReserved = yes

  # Give this option a value if the agent should monitor tasks
  MonitorTasks =

  # Give this option a value if the agent should monitor files
  MonitorFiles =

  # Status of transformations for which to monitor Files
  UpdateFilesStatus = Active,Completing,Stopped

  # Give this option a value if the agent should check Reserved tasks
  CheckReserved =

  # Status of transformations for which to check reserved tasks
  CheckReservedStatus = Active,Completing,Stopped

  # Location of the transformation plugins
  PluginLocation = DIRAC.TransformationSystem.Client.TaskManagerPlugin

  # maximum number of threads to use in this agent
  maxNumberOfThreads = 15

  #Time between cycles in seconds
  PollingTime = 120

  # Fill in this option if you want to activate bulk submission (for speed up)
  BulkSubmission = false
}

The options SubmitTasks, MonitorTasks, MonitorFiles, and CheckReserved need to be assigned any non-empty value to be activated

  • New in version v6r20p5: It is possible to run the RequestTaskAgent without a shifterProxy or ShifterCredentials, in this case the credentials of the authors of the transformations are used to submit the jobs to the RMS. This enables the use of a single RequestTaskAgent for multiple VOs. See also the section about the Multi VO Configuration.

class DIRAC.TransformationSystem.Agent.WorkflowTaskAgent.WorkflowTaskAgent(*args, **kwargs)

Bases: TaskManagerAgentBase

An AgentModule class to submit workflow tasks

__init__(*args, **kwargs)

c’tor

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)

Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.

am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)

Common initialization for all the agents.

This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.

am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()
checkReservedTasks(transDict, clients)

Checking Reserved tasks

endExecution()
execute()

The execution method is transformations that need to be processed

finalize()

graceful finalization

initialize()

Standard initialize method

submitTasks(transDict, clients)

Submit the tasks to an external system, using the taskManager provided

Parameters:
  • transIDOPBody (dict) – transformation body

  • clients (dict) – dictionary of client objects

Returns:

S_OK/S_ERROR

updateFileStatus(transDict, clients)

Update the files status

updateTaskStatus(transDict, clients)

Updates the task status