FTSAgent

mod:FTSAgent

synopsis:agent propagating scheduled RMS request in FTS

DIRAC agent propagating scheduled RMS request in FTS

Request processing phases (each in a separate thread):

  1. MONITOR

    …active FTSJobs, prepare FTSFiles dictionary with files to submit, fail, register and reschedule

  2. CHECK REPLICAS

    …just in case if all transfers are done, if yes, end processing

  3. FAILED FILES:

    …if at least one Failed FTSFile is found, set Request.Operation.File to ‘Failed’, end processing

  4. UPDATE Waiting#SourceSE FTSFiles

    …if any found in FTSDB

  5. REGISTER REPLICA

    …insert RegisterReplica operation to request, if some FTSFiles failed to register, end processing

  6. RESCHEDULE FILES

    …for FTSFiles failed with missing sources error

  7. SUBMIT

    …but read ‘Waiting’ FTSFiles first from FTSDB and merge those with FTSFiles to retry

exception DIRAC.DataManagementSystem.Agent.FTSAgent.EscapeTryException

Bases: exceptions.Exception

__init__

x.__init__(…) initializes x; see help(type(x)) for signature

args
message
class DIRAC.DataManagementSystem.Agent.FTSAgent.FTSAgent(agentName, loadName, baseAgentName=False, properties=None)

Bases: DIRAC.Core.Base.AgentModule.AgentModule

class FTSAgent

Agent propagating Scheduled request to Done or Failed state in the FTS system.

Requests and associated FTSJobs (and so FTSFiles) are kept in cache.

FTSPLACEMENT_REFRESH = 1800
MAX_ACTIVE_JOBS = 50
MAX_ATTEMPT = 256
MAX_FILES_PER_JOB = 100
MAX_REQUESTS = 100
MAX_THREADS = 10
MIN_THREADS = 1
MONITORING_INTERVAL = 600
MONITOR_COMMAND = 'glite-transfer-status'
PIN_TIME = 0
PROCESS_JOB_REQUESTS = None
SUBMIT_COMMAND = 'glite-transfer-submit'
__init__(agentName, loadName, baseAgentName=False, properties=None)

Common __init__ method for all Agents. All Agent modules must define: __doc__ __RCSID__ They are used to populate __codeProperties

The following Options are used from the Configuration: - /LocalSite/InstancePath - /DIRAC/Setup - Status - Enabled - PollingTime default = 120 - MaxCycles default = 500 - WatchdogTime default = 0 (disabled) - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy ‘’ - shifterProxyLocation WorkDirectory/SystemName/AgentName/.shifterCred

It defines the following default Options that can be set via Configuration (above): - MonitoringEnabled True - Enabled True if Status == Active - PollingTime 120 - MaxCycles 500 - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy False - shifterProxyLocation work/SystemName/AgentName/.shifterCred

different defaults can be set in the initialize() method of the Agent using am_setOption()

In order to get a shifter proxy in the environment during the execute() the configuration Option ‘shifterProxy’ must be set, a default may be given in the initialize() method.

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_disableMonitoring()
am_getBasePath()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)
am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)
am_monitoringEnabled()
am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()
endExecution()
execute()

one cycle execution

finalize()

finalize processing

classmethod ftsClient()

FTS client

classmethod getRequest(reqID)

get Requests systematically and refresh cache

initialize()

agent’s initialization

processRequest(request)

process one request

Parameters:request (Request) – ReqDB.Request
classmethod putFTSJobs(ftsJobsList)

put back fts jobs to the FTSDB

classmethod putRequest(request, clearCache=True)

put request back to ReqDB

Parameters:
  • request (Request) – Request instance
  • clearCache (bool) – clear the cache?

also finalize request if status == Done

registrationProtocols = None
classmethod requestClient()

request client getter

resetFTSPlacement()

create fts Placement

classmethod rssClient()

RSS client getter

threadPool()

thread pool getter

static updateFTSFileDict(ftsFilesDict, toUpdateDict)

update ftsFilesDict with FTSFiles in toUpdateDict

updateLock()

update lock