TaskQueueDB

TaskQueueDB class is a front-end to the task queues db

class DIRAC.WorkloadManagementSystem.DB.TaskQueueDB.TaskQueueDB(parentLogger=None)

Bases: DB

MySQL DB of “Task Queues”

__init__(parentLogger=None)

c’tor

Parameters:

self – self reference

buildCondition(condDict=None, older=None, newer=None, timeStamp=None, orderAttribute=None, limit=False, greater=None, smaller=None, offset=None, useLikeQuery=False)

Build SQL condition statement from provided condDict and other extra check on a specified time stamp. The conditions dictionary specifies for each attribute one or a List of possible values greater and smaller are dictionaries in which the keys are the names of the fields, that are requested to be >= or < than the corresponding value. For compatibility with current usage it uses Exceptions to exit in case of invalid arguments For performing LIKE queries use the parameter useLikeQuery=True

cleanOrphanedTaskQueues(connObj=False)

Delete all empty task queues

countEntries(table, condDict, older=None, newer=None, timeStamp=None, connection=False, greater=None, smaller=None)

Count the number of entries wit the given conditions

deleteEntries(tableName, condDict=None, limit=False, conn=None, older=None, newer=None, timeStamp=None, orderAttribute=None, greater=None, smaller=None)

Delete rows from “tableName” with N records can match the condition if limit is not False, the given limit is set String type values will be appropriately escaped, they can be single values or lists of values.

deleteJob(jobId, connObj=False)

Delete a job from the task queues Return S_OK( True/False ) / S_ERROR

deleteTaskQueueIfEmpty(tqId, tqOwner=False, tqOwnerGroup=False, connObj=False)

Try to delete a task queue if its empty

enableAllTaskQueues()

Enable all Task queues

executeStoredProcedure(packageName, parameters, outputIds, *, conn=None)
executeStoredProcedureWithCursor(packageName, parameters, *, conn=None)
findOrphanJobs()

Find jobs that are not in any task queue

fitCPUTimeToSegments(cpuTime)

Fit the CPU time to the valid segments

getCSOption(optionName, defaultValue=None)
getCounters(table, attrList, condDict, older=None, newer=None, timeStamp=None, connection=False, greater=None, smaller=None)

Count the number of records on each distinct combination of AttrList, selected with condition defined by condDict and time stamps

getDistinctAttributeValues(table, attribute, condDict=None, older=None, newer=None, timeStamp=None, connection=False, greater=None, smaller=None)

Get distinct values of a table attribute under specified conditions

getFields(tableName, outFields=None, condDict=None, limit=False, conn=None, older=None, newer=None, timeStamp=None, orderAttribute=None, greater=None, smaller=None, useLikeQuery=False)

Select “outFields” from “tableName” with condDict N records can match the condition return S_OK(tuple(Field, Value)) if outFields is None all fields in “tableName” are returned if limit is not False, the given limit is set inValues are properly escaped using the _escape_string method, they can be single values or lists of values. if useLikeQuery=True, then conDict can return matched rows if “%” is defined inside conDict.

static getGroupShares()

Get all the shares as a DICT

getGroupsInTQs()
getMatchingTaskQueues(tqMatchDict, negativeCond=False)

Get the info of the task queues that match a resource

getTaskQueueForJob(jobId, connObj=False)

Return TaskQueue for a given Job Return S_OK( [TaskQueueID] ) / S_ERROR

insertFields(tableName, inFields=None, inValues=None, conn=None, inDict=None)

Insert a new row in “tableName” assigning the values “inValues” to the fields “inFields”. String type values will be appropriately escaped.

insertJob(jobId, tqDefDict, jobPriority, skipTQDefCheck=False)

Insert a job in a task queue (creating one if it doesn’t exit)

Parameters:
  • jobId (int) – job ID

  • tqDefDict (dict) – dict for TQ definition

  • jobPriority (int) – integer that defines the job priority

Returns:

S_OK() / S_ERROR

isSharesCorrectionEnabled()
matchAndGetJob(tqMatchDict, numJobsPerTry=50, numQueuesPerTry=10, negativeCond=None)

Match a job based on requirements

Parameters:

tqDefDict (dict) – dict for TQ definition

Returns:

S_OK() / S_ERROR

matchAndGetTaskQueue(tqMatchDict, numQueuesToGet=1, skipMatchDictDef=False, negativeCond=None, connObj=False)

Get a queue that matches the requirements

recalculateTQSharesForAll()

Recalculate all priorities for TQ’s

recalculateTQSharesForEntity(user, userGroup, connObj=False)

Recalculate the shares for a user/userGroup combo

retrieveTaskQueues(tqIdList=None)

Get all the task queues

transactionCommit()
transactionRollback()
transactionStart()
updateFields(tableName, updateFields=None, updateValues=None, condDict=None, limit=False, conn=None, updateDict=None, older=None, newer=None, timeStamp=None, orderAttribute=None, greater=None, smaller=None)

Update “updateFields” from “tableName” with “updateValues”. updateDict alternative way to provide the updateFields and updateValues N records can match the condition return S_OK( number of updated rows ) if limit is not False, the given limit is set String type values will be appropriately escaped.

DIRAC.WorkloadManagementSystem.DB.TaskQueueDB.calculate_priority(tq_dict: dict[int, float], all_tqs_data: dict[int, dict[str, Any]], share: float, allow_bg_tqs: bool) dict[float, list[int]]

Calculate the priority for each TQ given a share

Parameters:
  • tq_dict – dict of {tq_id: prio}

  • all_tqs_data – dict of {tq_id: {tq_data}}, where tq_data is a dict of {field: value}

  • share – share to be distributed among TQs

  • allow_bg_tqs – allow background TQs to be used

Returns:

dict of {priority: [tq_ids]}