FTS3Agent

Added in version v6r20.

FTS3Agent implementation. It is in charge of submitting and monitoring all the transfers. It can be duplicated.

FTS3Agent options
FTS3Agent
{
  PollingTime = 120
  MaxThreads = 10
  # How many Operation we will treat in one loop
  OperationBulkSize = 20
  # How many Job we will monitor in one loop
  JobBulkSize = 20
  # split jobBulkSize in several chunks
  # Bigger numbers (like 100) are efficient when there's a single agent
  # When there are multiple agents, it may slow down the overall because
  # of lock and race conditions
  # (This number should of course be smaller or equal than JobBulkSize)
  JobMonitoringBatchSize = 20
  # Max number of files to go in a single job
  MaxFilesPerJob = 100
  # Max number of attempt per file
  MaxAttemptsPerFile = 256
  # days before removing jobs
  DeleteGraceDays = 180
  # Max number of deletes per cycle
  DeleteLimitPerCycle = 100
  # hours before kicking jobs with old assignment tag
  KickAssignedHours  = 1
  # Max number of kicks per cycle
  KickLimitPerCycle = 100
  # Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h)
  ProxyLifetime = 129600
  # Whether we use tokens to submit jobs to FTS3
  # VERY EXPERIMENTAL
  UseTokens = False
}
class DIRAC.DataManagementSystem.Agent.FTS3Agent.FTS3Agent(agentName, loadName, baseAgentName=False, properties={})

Bases: AgentModule

This Agent is responsible of interacting with the FTS3 services. Several of them can run in parallel. It first treats the Operations, by creating new FTS jobs and performing callback. Then, it monitors the current jobs.

CAUTION: This agent and the FTSAgent cannot run together.

__init__(agentName, loadName, baseAgentName=False, properties={})

Common __init__ method for all Agents. All Agent modules must define: __doc__

They are used to populate __codeProperties

The following Options are used from the Configuration: - /DIRAC/Setup - Status - Enabled - PollingTime default = 120 - MaxCycles default = 500 - WatchdogTime default = 0 (disabled) - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy ‘’ - shifterProxyLocation WorkDirectory/SystemName/AgentName/.shifterCred

It defines the following default Options that can be set via Configuration (above): - MonitoringEnabled True - Enabled True if Status == Active - PollingTime 120 - MaxCycles 500 - ControlDirectory control/SystemName/AgentName - WorkDirectory work/SystemName/AgentName - shifterProxy False - shifterProxyLocation work/SystemName/AgentName/.shifterCred

different defaults can be set in the initialize() method of the Agent using am_setOption()

In order to get a shifter proxy in the environment during the execute() the configuration Option ‘shifterProxy’ must be set, a default may be given in the initialize() method.

am_Enabled()
am_checkStopAgentFile()
am_createStopAgentFile()
am_getControlDirectory()
am_getCyclesDone()
am_getMaxCycles()
am_getModuleParam(optionName)
am_getOption(optionName, defaultValue=None)

Gets an option from the agent’s configuration section. The section will be a subsection of the /Systems section in the CS.

am_getPollingTime()
am_getShifterProxyLocation()
am_getStopAgentFile()
am_getWatchdogTime()
am_getWorkDirectory()
am_go()
am_initialize(*initArgs)

Common initialization for all the agents.

This is executed every time an agent (re)starts. This is called by the AgentReactor, should not be overridden.

am_removeStopAgentFile()
am_secureCall(functor, args=(), name=False)
am_setModuleParam(optionName, value)
am_setOption(optionName, value)
am_stopExecution()
beginExecution()

Reload configurations before start of a cycle

Returns:

S_OK()/S_ERROR()

deleteOperations()

Delete final operations

Returns:

S_OK()/S_ERROR()

endExecution()
execute()

One cycle execution

Returns:

S_OK()/S_ERROR()

finalize()

Finalize processing

Returns:

S_OK()/S_ERROR()

getFTS3Context(username, group, ftsServer, threadID)

Returns an fts3 context for a given user, group and fts server

The context pool is per thread, and there is one context per tuple (user, group, server). We dump the proxy of a user to a file (shared by all the threads), and use it to make the context. The proxy needs a lifetime of self.proxyLifetime, is cached for cacheTime = (2*lifeTime/3) - 10mn, and the lifetime of the context is 45mn The reason for cacheTime to be what it is is because the FTS3 server will ask for a new proxy after 2/3rd of the existing proxy has expired, so we renew it just before

Parameters:
  • username (str) – name of the user

  • group (str) – group of the user

  • ftsServer (str) – address of the server

  • threadID (str) – thread ID

Returns:

S_OK with the context object

initialize()

Agent’s initialization

Returns:

S_OK()/S_ERROR()

kickJobs()

Kick stuck jobs

Returns:

S_OK()/S_ERROR()

kickOperations()

Kick stuck operations

Returns:

S_OK()/S_ERROR()

monitorJobsLoop()
  • fetch the active FTSJobs from the DB

  • spawn a thread to monitor each of them

Returns:

S_OK()/S_ERROR()

treatOperationsLoop()
  • Fetch all the FTSOperations which are not finished

  • Spawn a thread to treat each operation

Returns:

S_OK()/S_ERROR()