ThreadPool

Usage of ThreadPool

ThreadPool creates a pool of worker threads to process a queue of tasks much like the producers/consumers paradigm. Users just need to fill the queue with tasks to be executed and worker threads will execute them

To start working with the ThreadPool first it has to be instanced:

threadPool = ThreadPool( minThreads, maxThreads, maxQueuedRequests )

minThreads        -> at all times no less than <minThreads> workers will be alive
maxThreads        -> at all times no more than <maxThreads> workers will be alive
maxQueuedRequests -> No more than <maxQueuedRequests> can be waiting to be executed
                     If another request is added to the ThreadPool, the thread will
                     lock until another request is taken out of the queue.

The ThreadPool will automatically increase and decrease the pool of workers as needed

To add requests to the queue:

threadPool.generateJobAndQueueIt( <functionToExecute>,
                                  args = ( arg1, arg2, ... ),
                                  oCallback = <resultCallbackFunction> )

or:

request = ThreadedJob( <functionToExecute>,
                       args = ( arg1, arg2, ... )
                       oCallback = <resultCallbackFunction> )
threadPool.queueJob( request )

The result callback and the parameters are optional arguments. Once the requests have been added to the pool. They will be executed as soon as possible. Worker threads automatically return the return value of the requests. To run the result callback functions execute:

threadPool.generateJobAndQueueIt( <functionToExecute>,
                                  args = ( arg1, arg2, ... ),
                                  oCallback = <resultCallbackFunction> )

or:

request = ThreadedJob( <functionToExecute>,
                       args = ( arg1, arg2, ... )
                       oCallback = <resultCallbackFunction> )
threadPool.queueJob( request )

The result callback and the parameters are optional arguments. Once the requests have been added to the pool. They will be executed as soon as possible. Worker threads automatically return the return value of the requests. To run the result callback functions execute:

threadPool.processRequests()

This method will process the existing return values of the requests. Even if the requests do not return anything this method (or any process result method) has to be called to clean the result queues.

To wait until all the requests are finished and process their result call:

threadPool.processAllRequests()

This function will block until all requests are finished and their result values have been processed.

It is also possible to set the threadPool in auto processing results mode. It’ll process the results as soon as the requests have finished. To enable this mode call:

threadPool.daemonize()
class DIRAC.Core.Utilities.ThreadPool.ThreadPool(iMinThreads, iMaxThreads=0, iMaxQueuedRequests=0, strictLimits=True)

Bases: threading.Thread

__init__(iMinThreads, iMaxThreads=0, iMaxQueuedRequests=0, strictLimits=True)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

daemon

A boolean value indicating whether this thread is a daemon thread (True) or not (False).

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

daemonize()
generateJobAndQueueIt(oCallable, args=None, kwargs=None, sTJId=None, oCallback=None, oExceptionCallback=None, blocking=True)
getMaxThreads()
getMinThreads()
getName()
ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the thread.get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

isAlive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

isDaemon()
isFull()
isWorking()
is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

numWaitingThreads()
numWorkingThreads()
pendingJobs()
processAllResults()
processResults()
queueJob(oTJob, blocking=True)
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setDaemon(daemonic)
setName(name)
start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

class DIRAC.Core.Utilities.ThreadPool.ThreadedJob(oCallable, args=None, kwargs=None, sTJId=None, oCallback=None, oExceptionCallback=None)
__init__(oCallable, args=None, kwargs=None, sTJId=None, oCallback=None, oExceptionCallback=None)
doCallback()
doExceptionCallback()
exceptionRaised()
hasCallback()
jobId()
process()
class DIRAC.Core.Utilities.ThreadPool.WorkingThread(oPendingQueue, oResultsQueue, **kwargs)

Bases: threading.Thread

__init__(oPendingQueue, oResultsQueue, **kwargs)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

daemon

A boolean value indicating whether this thread is a daemon thread (True) or not (False).

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

getName()
ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the thread.get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

isAlive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

isDaemon()
isWorking()
is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

kill()
name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setDaemon(daemonic)
setName(name)
start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

DIRAC.Core.Utilities.ThreadPool.getGlobalThreadPool()