ProcessPool
- synopsis:
ProcessPool and related classes
ProcessPool
ProcessPool creates a pool of worker subprocesses to handle a queue of tasks much like the producers/consumers paradigm. Users just need to fill the queue with tasks to be executed and worker tasks will execute them.
To construct ProcessPool one first should call its constructor:
pool = ProcessPool( minSize, maxSize, maxQueuedRequests )
where parameters are:
- param int minSize:
at least <minSize> workers will be alive all the time
- param int maxSize:
no more than <maxSize> workers will be alive all the time
- param int maxQueuedRequests:
size for request waiting in a queue to be executed
In case another request is added to the full queue, the execution will lock until another request is taken out. The ProcessPool will automatically increase and decrease the pool of workers as needed, of course not exceeding above limits.
To add a task to the queue one should execute::
pool.createAndQueueTask( funcDef,
args = ( arg1, arg2, ... ),
kwargs = { "kwarg1" : value1, "kwarg2" : value2 },
callback = callbackDef,
exceptionCallback = exceptionCallBackDef )
or alternatively by using ProcessTask instance::
task = ProcessTask( funcDef,
args = ( arg1, arg2, ... )
kwargs = { "kwarg1" : value1, .. },
callback = callbackDef,
exceptionCallback = exceptionCallbackDef )
pool.queueTask( task )
where parameters are:
- param funcDef:
callable by object definition (function, lambda, class with __call__ slot defined
- param list args:
argument list
- param dict kwargs:
keyword arguments dictionary
- param callback:
callback function definition
- param exceptionCallback:
exception callback function definition
The callback, exceptionCallbaks and the parameters are all optional. Once task has been added to the pool, it will be executed as soon as possible. Worker subprocesses automatically return the return value of the task. To obtain those results one has to execute:
pool.processRequests()
This method will process the existing return values of the task, even if the task does not return anything. This method has to be called to clean the result queues. To wait until all the requests are finished and process their result call:
pool.processAllRequests()
This function will block until all requests are finished and their result values have been processed.
It is also possible to set the ProcessPool in daemon mode, in which all results are automatically processed as soon they are available, just after finalization of task execution. To enable this mode one has to call:
pool.daemonize()
Callback functions
There are two types of callbacks that can be executed for each tasks: exception callback function and results callback function. The first one is executed when unhandled exception has been raised during task processing, and hence no task results are available, otherwise the execution of second callback type is performed.
The callbacks could be attached in a two places:
directly in ProcessTask, in that case those have to be shelvable/picklable, so they should be defined as global functions with the signature :callback( task, taskResult ): where :task: is a :ProcessTask: reference and :taskResult: is whatever task callable it returning for results callback and :exceptionCallback( task, exc_info): where exc_info is a :S_ERROR( “Exception”: { “Value” : exceptionName, “Exc_info” : exceptionInfo ):
in ProcessPool, in that case there is no limitation on the function type, except the signature, which should follow :callback( task ): or :exceptionCallback( task ):, as those callbacks definitions are not put into the queues
The first types of callbacks could be used in case various callable objects are put into the ProcessPool, so you probably want to handle them differently depending on their results, while the second types are for executing same type of callables in subprocesses and hence you are expecting the same type of results everywhere.
- class DIRAC.Core.Utilities.ProcessPool.ProcessPool(minSize=2, maxSize=0, maxQueuedRequests=10, strictLimits=True, poolCallback=None, poolExceptionCallback=None, keepProcessesRunning=True)
Bases:
object
- class ProcessPool
ProcessPool
This class is managing multiprocessing execution of tasks (:ProcessTask: instances) in a separate sub-processes (:WorkingProcess:).
Pool depth
The :ProcessPool: is keeping required number of active workers all the time: slave workers are only created when pendingQueue is being filled with tasks, not exceeding defined min and max limits. When pendingQueue is empty, active workers will be cleaned up by themselves, as each worker has got built in self-destroy mechanism after 10 idle loops.
Processing and communication
The communication between :ProcessPool: instance and slaves is performed using two :multiprocessing.Queues:
pendingQueue, used to push tasks to the workers,
resultsQueue for revert direction;
and one :multiprocessing.Event: instance (stopEvent), which is working as a fuse to destroy idle workers in a clean manner.
Processing of task begins with pushing it into :pendingQueue: using :ProcessPool.queueTask: or :ProcessPool.createAndQueueTask:. Every time new task is queued, :ProcessPool: is checking existance of active and idle workers and spawning new ones when required. The task is then read and processed on worker side. If results are ready and callback functions are defined, task is put back to the resultsQueue and it is ready to be picked up by ProcessPool again. To perform this last step one has to call :ProcessPool.processResults:, or alternatively ask for daemon mode processing, when this function is called again and again in separate background thread.
Finalisation
Finalization for task processing is done in several steps:
if pool is working in daemon mode, background result processing thread is joined and stopped
- pendingQueue:
is emptied by :ProcessPool.processAllResults: function, all enqueued tasks are executed
- stopEvent:
is set, so all idle workers are exiting immediately
non-hanging workers are joined and terminated politelty
the rest of workers, if any, are forcefully retained by signals: first by SIGTERM, and if is doesn’t work by SIGKILL
- Warn:
Be carefull and choose wisely :timeout: argument to :ProcessPool.finalize:. Too short time period can cause that all workers will be killed.
- __init__(minSize=2, maxSize=0, maxQueuedRequests=10, strictLimits=True, poolCallback=None, poolExceptionCallback=None, keepProcessesRunning=True)
c’tor
- Parameters:
self – self reference
minSize (int) – minimal number of simultaniously executed tasks
maxSize (int) – maximal number of simultaniously executed tasks
maxQueueRequests (int) – size of pending tasks queue
strictLimits (bool) – flag to workers overcommitment
poolCallbak (callable) – results callback
poolExceptionCallback (callable) – exception callback
- createAndQueueTask(taskFunction, args=None, kwargs=None, taskID=None, callback=None, exceptionCallback=None, blocking=True, usePoolCallbacks=False, timeOut=0)
Create new processTask and enqueue it in pending task queue
- Parameters:
self – self reference
taskFunction (mixed) – callable object definition (FunctionType, LambdaType, callable class)
args (tuple) – non-keyword arguments passed to taskFunction c’tor
kwargs (dict) – keyword arguments passed to taskFunction c’tor
taskID (int) – task Id
callback (mixed) – callback handler, callable object executed after task’s execution
exceptionCallback (mixed) – callback handler executed if testFunction had raised an exception
blocking (bool) – flag to block queue if necessary until free slot is available
usePoolCallbacks (bool) – fire execution of pool defined callbacks after task callbacks
timeOut (int) – time you want to spend executing :taskFunction:
- daemonize()
Make ProcessPool a finite being for opening and closing doors between chambers. Also just run it in a separate background thread to the death of PID 0.
- Parameters:
self – self reference
- finalize(timeout=60)
Drain pool, shutdown processing in more or less clean way
- Parameters:
self – self reference
timeout – seconds to wait before killing
- getFreeSlots()
get number of free slots available for workers
- Parameters:
self – self reference
- getMaxSize()
MaxSize getter
- Parameters:
self – self reference
- getMinSize()
MinSize getter
- Parameters:
self – self reference
- getNumIdleProcesses()
Count processes being idle
- Parameters:
self – self reference
- getNumWorkingProcesses()
Count processes currently being executed
- Parameters:
self – self reference
- hasPendingTasks()
Check if taks are present in pending queue
- Parameters:
self – self reference
- Warning:
results may be misleading if elements put into the queue are big
- isFull()
Check in peding queue is full
- Parameters:
self – self reference
- Warning:
results may be misleading if elements put into the queue are big
- isWorking()
Check existence of working subprocesses
- Parameters:
self – self reference
- processAllResults(timeout=10)
Process all enqueued tasks at once
- Parameters:
self – self reference
- processResults()
Execute tasks’ callbacks removing them from results queue
- Parameters:
self – self reference
- queueTask(task, blocking=True, usePoolCallbacks=False)
Enqueue new task into pending queue
- Parameters:
self – self reference
task (ProcessTask) – new task to execute
blocking (bool) – flag to block if necessary and new empty slot is available (default = block)
usePoolCallbacks (bool) – flag to trigger execution of pool callbacks (default = don’t execute)
- setPoolCallback(callback)
Set ProcessPool callback function
- Parameters:
self – self reference
callback (callable) – callback function
- setPoolExceptionCallback(exceptionCallback)
Set ProcessPool exception callback function
- Parameters:
self – self refernce
exceptionCallback (callable) – exsception callback function
- startProcessing()
Restart processing again
- Parameters:
self – self reference
- stopProcessing(timeout=10)
Case fire
- Parameters:
self – self reference
- class DIRAC.Core.Utilities.ProcessPool.ProcessTask(taskFunction, args=None, kwargs=None, taskID=None, callback=None, exceptionCallback=None, usePoolCallbacks=False, timeOut=0)
Bases:
object
Defines task to be executed in WorkingProcess together with its callbacks.
- __init__(taskFunction, args=None, kwargs=None, taskID=None, callback=None, exceptionCallback=None, usePoolCallbacks=False, timeOut=0)
c’tor
- Warning:
taskFunction has to be callable: it could be a function, lambda OR a class with __call__ operator defined. But be carefull with interpretation of args and kwargs, as they are passed to different places in above cases:
for functions or lambdas args and kwargs are just treated as function parameters
for callable classess (say MyTask) args and kwargs are passed to class contructor (MyTask.__init__) and MyTask.__call__ should be a method without parameters, i.e. MyTask definition should be:
class MyTask: def __init__( self, *args, **kwargs ): ... def __call__( self ): ...
- Warning:
depending on :timeOut: value, taskFunction execution can be forcefully terminated using SIGALRM after :timeOut: seconds spent, :timeOut: equal to zero means there is no any time out at all, except those during :ProcessPool: finalization
- Parameters:
self – self reference
taskFunction (mixed) – definition of callable object to be executed in this task
args (tuple) – non-keyword arguments
kwargs (dict) – keyword arguments
taskID (int) – task id, if not set,
timeOut (int) – estimated time to execute taskFunction in seconds (default = 0, no timeOut at all)
callback (mixed) – result callback function
exceptionCallback (mixed) – callback function to be fired upon exception in taskFunction
- disablePoolCallbacks()
Disable execution of ProcessPool callbacks
- doCallback()
Execute result callback function
- Parameters:
self – self reference
- doExceptionCallback()
Execute exceptionCallback
- Parameters:
self – self reference
- enablePoolCallbacks()
(re)enable use of ProcessPool callbacks
- exceptionRaised()
Flag to determine exception in process
- Parameters:
self – self reference
- getTaskID()
TaskID getter
- Parameters:
self – self reference
- getTimeOut()
Get timeOut value
- Parameters:
self – self reference
- hasCallback()
Callback existence checking
- Parameters:
self – self reference
- Returns:
True if callback or exceptionCallback has been defined, False otherwise
- hasPoolCallback()
Check if asked to execute :ProcessPool: callbacks
- Parameters:
self – self reference
- hasTimeOutSet()
Check if timeout is set
- Parameters:
self – self reference
- process()
Execute task
- Parameters:
self – self reference
- setResult(result)
Set taskResult to result
- setTimeOut(timeOut)
Set time out (in seconds)
- Parameters:
self – selt reference
timeOut (int) – new time out value
- taskException()
Get task exception
- Parameters:
self – self reference
- taskID = 0
- taskResults()
Get task results
- Parameters:
self – self reference
- usePoolCallbacks()
Check if results should be processed by callbacks defined in the :ProcessPool:
- Parameters:
self – self reference
- class DIRAC.Core.Utilities.ProcessPool.WorkingProcess(pendingQueue, resultsQueue, stopEvent, keepRunning)
Bases:
Process
- class WorkingProcess
WorkingProcess is a class that represents activity that runs in a separate process.
It is running main thread (process) in daemon mode, reading tasks from :pendingQueue:, executing them and pushing back tasks with results to the :resultsQueue:. If task has got a timeout value defined a separate threading.Timer thread is started killing execution (and destroying worker) after :ProcessTask.__timeOut: seconds.
Main execution could also terminate in a few different ways:
on every failed read attempt (from empty :pendingQueue:), the idle loop counter is increased, worker is terminated when counter is reaching a value of 10;
when stopEvent is set (so ProcessPool is in draining mode),
when parent process PID is set to 1 (init process, parent process with ProcessPool is dead).
- __init__(pendingQueue, resultsQueue, stopEvent, keepRunning)
c’tor
- Parameters:
self – self reference
pendingQueue (multiprocessing.Queue) – queue storing ProcessTask before exection
resultsQueue (multiprocessing.Queue) – queue storing callbacks and exceptionCallbacks
stopEvent (multiprocessing.Event) – event to stop processing
- property authkey
- close()
Close the Process object.
This method releases resources held by the Process object. It is an error to call this method if the child process is still running.
- property daemon
Return whether process is a daemon
- property exitcode
Return exit code of process or None if it has yet to stop
- property ident
Return identifier (PID) of process or None if it has yet to start
- isWorking()
Check if process is being executed
- Parameters:
self – self reference
- is_alive()
Return whether process is alive
- join(timeout=None)
Wait until child process terminates
- kill()
Terminate process; sends SIGKILL signal or uses TerminateProcess()
- property name
- property pid
Return identifier (PID) of process or None if it has yet to start
- run()
Task execution
Reads and executes ProcessTask :task: out of pending queue and then pushes it to the results queue for callback execution.
- Parameters:
self – self reference
- property sentinel
Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
- start()
Start child process
- taskProcessed()
Tell how many tasks have been processed so far
- Parameters:
self – self reference
- terminate()
Terminate process; sends SIGTERM signal or uses TerminateProcess()