FTS3Agent
Added in version v6r20.
FTS3Agent implementation. It is in charge of submitting and monitoring all the transfers. It can be duplicated.
FTS3Agent
{
PollingTime = 120
MaxThreads = 10
# How many Operation we will treat in one loop
OperationBulkSize = 20
# How many Job we will monitor in one loop
JobBulkSize = 20
# split jobBulkSize in several chunks
# Bigger numbers (like 100) are efficient when there's a single agent
# When there are multiple agents, it may slow down the overall because
# of lock and race conditions
# (This number should of course be smaller or equal than JobBulkSize)
JobMonitoringBatchSize = 20
# Max number of files to go in a single job
MaxFilesPerJob = 100
# Max number of attempt per file
MaxAttemptsPerFile = 256
# days before removing jobs
DeleteGraceDays = 180
# Max number of deletes per cycle
DeleteLimitPerCycle = 100
# hours before kicking jobs with old assignment tag
KickAssignedHours = 1
# Max number of kicks per cycle
KickLimitPerCycle = 100
# Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h)
ProxyLifetime = 129600
# Whether we use tokens to submit jobs to FTS3
# VERY EXPERIMENTAL
UseTokens = False
}
- class DIRAC.DataManagementSystem.Agent.FTS3Agent.FTS3Agent(agentName, loadName, baseAgentName=False, properties={})
Bases:
AgentModule
This Agent is responsible of interacting with the FTS3 services. Several of them can run in parallel. It first treats the Operations, by creating new FTS jobs and performing callback. Then, it monitors the current jobs.
CAUTION: This agent and the FTSAgent cannot run together.
- __init__(agentName, loadName, baseAgentName=False, properties={})
Common __init__ method for all Agents. All Agent modules must define: __doc__
They are used to populate __codeProperties
The following Options are used from the Configuration: - /DIRAC/Setup - Status - Enabled - PollingTime default = 120 - MaxCycles default = 500 - WatchdogTime default = 0 (disabled) - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy ‘’ - shifterProxyLocation WorkDirectory/SystemName/AgentName/.shifterCred
It defines the following default Options that can be set via Configuration (above): - MonitoringEnabled True - Enabled True if Status == Active - PollingTime 120 - MaxCycles 500 - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy False - shifterProxyLocation work/SystemName/AgentName/.shifterCred
different defaults can be set in the initialize() method of the Agent using am_setOption()
In order to get a shifter proxy in the environment during the execute() the configuration Option ‘shifterProxy’ must be set, a default may be given in the initialize() method.
- am_Enabled()
- am_checkStopAgentFile()
- am_createStopAgentFile()
- am_getControlDirectory()
- am_getCyclesDone()
- am_getMaxCycles()
- am_getModuleParam(optionName)
- am_getOption(optionName, defaultValue=None)
Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.
- am_getPollingTime()
- am_getShifterProxyLocation()
- am_getStopAgentFile()
- am_getWatchdogTime()
- am_getWorkDirectory()
- am_go()
- am_initialize(*initArgs)
Common initialization for all the agents.
This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.
- am_removeStopAgentFile()
- am_secureCall(functor, args=(), name=False)
- am_setModuleParam(optionName, value)
- am_setOption(optionName, value)
- am_stopExecution()
- beginExecution()
Reload configurations before start of a cycle
- Returns:
S_OK()/S_ERROR()
- deleteOperations()
Delete final operations
- Returns:
S_OK()/S_ERROR()
- endExecution()
- execute()
One cycle execution
- Returns:
S_OK()/S_ERROR()
- finalize()
Finalize processing
- Returns:
S_OK()/S_ERROR()
- getFTS3Context(username, group, ftsServer, threadID)
Returns an fts3 context for a given user, group and fts server
The context pool is per thread, and there is one context per tuple (user, group, server). We dump the proxy of a user to a file (shared by all the threads), and use it to make the context. The proxy needs a lifetime of self.proxyLifetime, is cached for cacheTime = (2*lifeTime/3) - 10mn, and the lifetime of the context is 45mn The reason for cacheTime to be what it is is because the FTS3 server will ask for a new proxy after 2/3rd of the existing proxy has expired, so we renew it just before
- initialize()
Agent’s initialization
- Returns:
S_OK()/S_ERROR()
- kickJobs()
Kick stuck jobs
- Returns:
S_OK()/S_ERROR()
- kickOperations()
Kick stuck operations
- Returns:
S_OK()/S_ERROR()
- monitorJobsLoop()
fetch the active FTSJobs from the DB
spawn a thread to monitor each of them
- Returns:
S_OK()/S_ERROR()
- treatOperationsLoop()
Fetch all the FTSOperations which are not finished
Spawn a thread to treat each operation
- Returns:
S_OK()/S_ERROR()