Skip to content

Commit

Permalink
fix: removed obsoleted commands and utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Oct 11, 2023
1 parent 60d00f8 commit 9d357f3
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 276 deletions.
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ console_scripts =
dirac-framework-self-ping = DIRAC.Interfaces.scripts.dirac_framework_self_ping:main [server]
dirac-utils-file-adler = DIRAC.Interfaces.scripts.dirac_utils_file_adler:main
dirac-utils-file-md5 = DIRAC.Interfaces.scripts.dirac_utils_file_md5:main
dirac-wms-get-normalized-queue-length = DIRAC.Interfaces.scripts.dirac_wms_get_normalized_queue_length:main [admin]
dirac-wms-get-queue-normalization = DIRAC.Interfaces.scripts.dirac_wms_get_queue_normalization:main [pilot]
dirac-wms-job-attributes = DIRAC.Interfaces.scripts.dirac_wms_job_attributes:main
dirac-wms-job-delete = DIRAC.Interfaces.scripts.dirac_wms_job_delete:main
dirac-wms-job-get-input = DIRAC.Interfaces.scripts.dirac_wms_job_get_input:main
Expand Down

This file was deleted.

40 changes: 0 additions & 40 deletions src/DIRAC/Interfaces/scripts/dirac_wms_get_queue_normalization.py

This file was deleted.

153 changes: 1 addition & 152 deletions src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,163 +6,12 @@
""" DIRAC Workload Management System Client module that encapsulates all the
methods necessary to handle CPU normalization
"""
import os
from urllib.request import urlopen
from db12 import single_dirac_benchmark

import DIRAC
from DIRAC import gConfig, gLogger, S_OK, S_ERROR
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft

# TODO: This should come from some place in the configuration
NORMALIZATIONCONSTANT = 60.0 / 250.0 # from minutes to seconds and from SI00 to HS06 (ie min * SI00 -> sec * HS06 )

UNITS = {"HS06": 1.0, "SI00": 1.0 / 250.0}

# TODO: This is still fetching directly from MJF rather than going through
# the MJF module and the values it saves in the local DIRAC configuration


def __getFeatures(envVariable, items):
"""Extract features"""
features = {}
featuresDir = os.environ.get(envVariable)
if featuresDir is None:
return features
for item in items:
fname = os.path.join(featuresDir, item)
try:
# Only keep features that do exist
features[item] = urlopen(fname).read()
except OSError:
pass
return features


def getMachineFeatures():
"""This uses the _old_ MJF information"""
return __getFeatures("MACHINEFEATURES", ("hs06", "jobslots", "log_cores", "phys_cores"))


# TODO: log_cores and phys_cores are deprecated and from old MJF specificationa and not collected
# by the MJF module!


def getJobFeatures():
"""This uses the _new_ MJF information"""
return __getFeatures("JOBFEATURES", ("hs06_job", "allocated_cpu"))


def getPowerFromMJF():
"""Extracts the machine power from either JOBFEATURES or MACHINEFEATURES"""
try:
features = getJobFeatures()
hs06Job = features.get("hs06_job")
# If the information is there and non zero, return, otherwise go to machine features
if hs06Job:
return round(float(hs06Job), 2)
features = getMachineFeatures()
totalPower = float(features.get("hs06", 0))
logCores = float(features.get("log_cores", 0))
physCores = float(features.get("phys_cores", 0))
jobSlots = float(features.get("jobslots", 0))
denom = min(max(logCores, physCores), jobSlots) if (logCores or physCores) and jobSlots else None
if totalPower and denom:
return round(totalPower / denom, 2)
return None
except ValueError as e:
gLogger.exception("Exception getting MJF information", lException=e)
return None


def queueNormalizedCPU(ceUniqueID):
"""Report Normalized CPU length of queue"""
result = getQueueInfo(ceUniqueID)
if not result["OK"]:
return result

ceInfoDict = result["Value"]
siteCSSEction = ceInfoDict["SiteCSSEction"]
queueCSSection = ceInfoDict["QueueCSSection"]

benchmarkSI00 = __getQueueNormalization(queueCSSection, siteCSSEction)
maxCPUTime = __getMaxCPUTime(queueCSSection)

if maxCPUTime and benchmarkSI00:
normCPUTime = NORMALIZATIONCONSTANT * maxCPUTime * benchmarkSI00
else:
if not benchmarkSI00:
subClusterUniqueID = ceInfoDict["SubClusterUniqueID"]
return S_ERROR(f"benchmarkSI00 info not available for {subClusterUniqueID}")
if not maxCPUTime:
return S_ERROR("maxCPUTime info not available")

return S_OK(normCPUTime)


def getQueueNormalization(ceUniqueID):
"""Report Normalization Factor applied by Site to the given Queue"""
result = getQueueInfo(ceUniqueID)
if not result["OK"]:
return result

ceInfoDict = result["Value"]
siteCSSEction = ceInfoDict["SiteCSSEction"]
queueCSSection = ceInfoDict["QueueCSSection"]
subClusterUniqueID = ceInfoDict["SubClusterUniqueID"]

benchmarkSI00 = __getQueueNormalization(queueCSSection, siteCSSEction)

if benchmarkSI00:
return S_OK(benchmarkSI00)
return S_ERROR(f"benchmarkSI00 info not available for {subClusterUniqueID}")
# errorList.append( ( subClusterUniqueID , 'benchmarkSI00 info not available' ) )
# exitCode = 3


def __getQueueNormalization(queueCSSection, siteCSSEction):
"""Query the CS and return the Normalization"""
benchmarkSI00Option = f"{queueCSSection}/SI00"
benchmarkSI00 = gConfig.getValue(benchmarkSI00Option, 0.0)
if not benchmarkSI00:
benchmarkSI00Option = f"{siteCSSEction}/SI00"
benchmarkSI00 = gConfig.getValue(benchmarkSI00Option, 0.0)

return benchmarkSI00


def __getMaxCPUTime(queueCSSection):
"""Query the CS and return the maxCPUTime"""
maxCPUTimeOption = f"{queueCSSection}/maxCPUTime"
maxCPUTime = gConfig.getValue(maxCPUTimeOption, 0.0)
# For some sites there are crazy values in the CS
maxCPUTime = max(maxCPUTime, 0)
maxCPUTime = min(maxCPUTime, 86400 * 12.5)

return maxCPUTime


def getCPUNormalization(reference="HS06", iterations=1):
"""Get Normalized Power of the current CPU in [reference] units"""
if reference not in UNITS:
return S_ERROR(f"Unknown Normalization unit {str(reference)}")
try:
max(min(int(iterations), 10), 1)
except (TypeError, ValueError) as x:
return S_ERROR(x)

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations

corr = Operations().getValue("JobScheduling/CPUNormalizationCorrection", 1.0)

result = single_dirac_benchmark(iterations)

if result is None:
return S_ERROR("Cannot get benchmark measurements")

return S_OK({"CPU": result["CPU"], "WALL": result["WALL"], "NORM": result["NORM"] / corr, "UNIT": reference})


def getCPUTime(cpuNormalizationFactor):
"""Trying to get CPUTime left for execution (in seconds).
Expand Down
42 changes: 2 additions & 40 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
""" DIRAC Workload Management System utility module to get available memory and processors from mjf
"""
import os
import multiprocessing
import os
from urllib.request import urlopen

from DIRAC import gLogger, gConfig
from DIRAC import gConfig, gLogger
from DIRAC.Core.Utilities.List import fromChar


Expand Down Expand Up @@ -137,44 +137,6 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None):
return 1


def getNumberOfPayloadProcessors(siteName=None, gridCE=None, queue=None):
"""Gets the number of processors allowed for a single JobAgent (so for a "inner" CE).
(NB: this does not refer to the job processors).
This is normally used ONLY when a pilot instantiates more than one JobAgent (MultiLaunchAgent pilot command).
The siteName/gridCE/queue parameters are normally not necessary.
Tries to find it in this order:
1) from the /Resources/Computing/CEDefaults/NumberOfPayloadProcessors (which is what pilot 3 fills up)
2) if not present but there's WholeNode tag, use the getNumberOfProcessors function above
3) otherwise returns 1
"""

# 1) from /Resources/Computing/CEDefaults/NumberOfPayloadProcessors
gLogger.info("Getting NumberOfPayloadProcessors from /Resources/Computing/CEDefaults/NumberOfPayloadProcessors")
NumberOfPayloadProcessors = gConfig.getValue("/Resources/Computing/CEDefaults/NumberOfPayloadProcessors")
if NumberOfPayloadProcessors:
return NumberOfPayloadProcessors

# 2) Checks if 'WholeNode' is one of the used tags
# Tags of the CE
tags = fromChar(
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Tag", "")
) + fromChar(gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/Tag", ""))
# Tags of the Queue
tags += fromChar(
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues/{queue}/Tag", "")
) + fromChar(
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/VMTypes/{queue}/Tag", "")
)

if "WholeNode" in tags:
return getNumberOfProcessors()

# 3) Just returns a conservative "1"
return 1


def getNumberOfJobProcessors(jobID):
"""Gets the number of processors allowed for the job.
This can be used to communicate to your job payload the number of processors it's allowed to use,
Expand Down

0 comments on commit 9d357f3

Please sign in to comment.