RequestExecutingAgent

Agent processing the requests

See also the information about the Request Management System.

The following options can be set for the RequestExecutingAgent. The configuration also includes the OperationHandlers available in DIRAC.

RequestExecutingAgent options
RequestExecutingAgent
{
  PollingTime = 60
  # number of Requests to execute per cycle
  RequestsPerCycle = 100
  # minimum number of workers process in the ProcessPool
  MinProcess = 20
  # maximum number of workers process in the ProcessPool; recommended to set it to the same value as MinProcess
  MaxProcess = 20
  # queue depth of the ProcessPool
  ProcessPoolQueueSize = 20
  # timeout for the ProcessPool finalization
  ProcessPoolTimeout = 900
  # sleep time before retrying to get a free slot in the ProcessPool
  ProcessPoolSleep = 5
  # If a positive integer n is given, we fetch n requests at once from the DB. Otherwise, one by one
  BulkRequest = 0
  OperationHandlers
  {
    ForwardDISET
    {
      Location = DIRAC/RequestManagementSystem/Agent/RequestOperations/ForwardDISET
      LogLevel = INFO
      MaxAttempts = 256
      TimeOut = 120
    }
    ReplicateAndRegister
    {
      Location = DIRAC/DataManagementSystem/Agent/RequestOperations/ReplicateAndRegister
      FTSMode = False
      FTSBannedGroups = dirac_user, lhcb_user
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 600
    }
    PutAndRegister
    {
      Location = DIRAC/DataManagementSystem/Agent/RequestOperations/PutAndRegister
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 600
    }
    RegisterReplica
    {
      Location = DIRAC/DataManagementSystem/Agent/RequestOperations/RegisterReplica
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 120
    }
    RemoveReplica
    {
      Location = DIRAC/DataManagementSystem/Agent/RequestOperations/RemoveReplica
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 120
    }
    RemoveFile
    {
      Location = DIRAC/DataManagementSystem/Agent/RequestOperations/RemoveFile
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 120
    }
    RegisterFile
    {
       Location = DIRAC/DataManagementSystem/Agent/RequestOperations/RegisterFile
       LogLevel = INFO
       MaxAttempts = 256
       TimeOutPerFile = 120
    }
    SetFileStatus
    {
      Location = DIRAC/TransformationSystem/Agent/RequestOperations/SetFileStatus
      LogLevel = INFO
      MaxAttempts = 256
      TimeOutPerFile = 120
    }
  }
}
exception DIRAC.RequestManagementSystem.Agent.RequestExecutingAgent.AgentConfigError(msg)

Bases: Exception

misconfiguration error

__init__(msg)

ctor

Parameters:

msg (str) – error string

add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class DIRAC.RequestManagementSystem.Agent.RequestExecutingAgent.RequestExecutingAgent(*args, **kwargs)

Bases: AgentModule

class RequestExecutingAgent

request processing agent using ProcessPool, Operation handlers and RequestTask

__init__(*args, **kwargs)

c’tor

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)

Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.

am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)

Common initialization for all the agents.

This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.

am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()
cacheRequest(request)

put request into requestCache

Parameters:

request (Request) – Request instance

endExecution()
exceptionCallback(taskID, taskException)

definition of exception callback function

Parameters:
  • taskID (str) – Request.RequestID

  • taskException (Exception) – Exception instance

execute()

read requests from RequestClient and enqueue them into ProcessPool

finalize()

agent finalization

getTimeout(request)

get timeout for request

initialize()

initialize agent

processPool()

facade for ProcessPool

putAllRequests()

put back all requests without callback called into requestClient

Parameters:

self – self reference

putRequest(requestID, taskResult=None)

put back :requestID: to RequestClient

Parameters:

requestID (str) – request’s id

requestClient()

RequestClient getter

resultCallback(taskID, taskResult)

definition of request callback function

Parameters:
  • taskID (str) – Request.RequestID

  • taskResult (dict) – task result S_OK(Request)/S_ERROR(Message)