Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve runTheMatrix interactive shell #42923

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 134 additions & 39 deletions Configuration/PyReleaseValidation/scripts/runTheMatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,31 +135,31 @@ def runSelected(opt):
dest='memPerCore',
type=int,
default=1500)

parser.add_argument('-j','--nproc',
help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
dest='nProcs',
type=int,
default=4)

parser.add_argument('-t','--nThreads',
help='number of threads per process to use in cmsRun.',
dest='nThreads',
type=int,
default=1)

parser.add_argument('--nStreams',
help='number of streams to use in cmsRun.',
dest='nStreams',
type=int,
default=0)

parser.add_argument('--nEvents',
help='number of events to process in cmsRun. If 0 will use the standard 10 events.',
dest='nEvents',
type=int,
default=0)

parser.add_argument('--numberEventsInLuminosityBlock',
help='number of events in a luminosity block',
dest='numberEventsInLuminosityBlock',
Expand All @@ -171,19 +171,19 @@ def runSelected(opt):
dest='show',
default=False,
action='store_true')

parser.add_argument('-e','--extended',
help='Show details of workflows, used with --show',
dest='extended',
default=False,
action='store_true')

parser.add_argument('-s','--selected',
help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
dest='restricted',
default=False,
action='store_true')

parser.add_argument('-l','--list',
help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc',
dest='testList',
Expand All @@ -197,105 +197,105 @@ def runSelected(opt):
parser.add_argument('-r','--raw',
help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
dest='raw')

parser.add_argument('-i','--useInput',
help='Use recyling where available. Either all, or a comma separated list of wf number.',
dest='useInput',
type=lambda x: x.split(','),
default=None)

parser.add_argument('-w','--what',
help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
dest='what',
default='all')

parser.add_argument('--step1',
help='Used with --raw. Limit the production to step1',
dest='step1Only',
default=False)

parser.add_argument('--maxSteps',
help='Only run maximum on maxSteps. Used when we are only interested in first n steps.',
dest='maxSteps',
default=9999,
type=int)

parser.add_argument('--fromScratch',
help='Comma separated list of wf to be run without recycling. all is not supported as default.',
dest='fromScratch',
type=lambda x: x.split(','),
default=None)

parser.add_argument('--refRelease',
help='Allow to modify the recycling dataset version',
dest='refRel',
default=None)

parser.add_argument('--wmcontrol',
help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
choices=['init','test','submit','force'],
dest='wmcontrol',
default=None)

parser.add_argument('--revertDqmio',
help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
choices=['yes','no'],
dest='revertDqmio',
default='no')

parser.add_argument('--optionswm',
help='Specify a few things for wm injection',
default='',
dest='wmoptions')

parser.add_argument('--keep',
help='allow to specify for which comma separated steps the output is needed',
default=None)

parser.add_argument('--label',
help='allow to give a special label to the output dataset name',
default='')

parser.add_argument('--command',
help='provide a way to add additional command to all of the cmsDriver commands in the matrix',
dest='command',
action='append',
default=None)

parser.add_argument('--apply',
help='allow to use the --command only for 1 comma separeated',
dest='apply',
default=None)

parser.add_argument('--workflow',
help='define a workflow to be created or altered from the matrix',
action='append',
dest='workflow',
default=None)

parser.add_argument('--dryRun',
help='do not run the wf at all',
action='store_true',
dest='dryRun',
default=False)

parser.add_argument('--testbed',
help='workflow injection to cmswebtest (you need dedicated rqmgr account)',
dest='testbed',
default=False,
action='store_true')

parser.add_argument('--noCafVeto',
help='Run from any source, ignoring the CAF label',
dest='cafVeto',
default=True,
action='store_false')

parser.add_argument('--overWrite',
help='Change the content of a step for another. List of pairs.',
dest='overWrite',
default=None)

parser.add_argument('--noRun',
help='Remove all run list selection from wfs',
dest='noRun',
Expand All @@ -313,30 +313,30 @@ def runSelected(opt):
dest='jobReports',
default=False,
action='store_true')

parser.add_argument('--ibeos',
help='Use IB EOS site configuration',
dest='IBEos',
default=False,
action='store_true')

parser.add_argument('--sites',
help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
dest='dasSites',
default='T2_CH_CERN',
action='store')

parser.add_argument('--interactive',
help="Open the Matrix interactive shell",
action='store_true',
default=False)

parser.add_argument('--dbs-url',
help='Overwrite DbsUrl value in JSON submitted to ReqMgr2',
dest='dbsUrl',
default=None,
action='store')

gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.')

gpugroup.add_argument('--gpu','--requires-gpu',
Expand All @@ -353,13 +353,13 @@ def runSelected(opt):
dest='GPUMemoryMB',
type=int,
default=8000)

gpugroup.add_argument('--cuda-capabilities',
help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
dest='CUDACapabilities',
type=lambda x: x.split(','),
default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')

# read the CUDA runtime version included in CMSSW
cudart_version = None
libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
Expand All @@ -370,22 +370,22 @@ def runSelected(opt):
help='Specify major and minor version of the CUDA runtime used to build the application.',
dest='CUDARuntime',
default=cudart_version)

gpugroup.add_argument('--force-gpu-name',
help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
dest='GPUName',
default='')

gpugroup.add_argument('--force-cuda-driver-version',
help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
dest='CUDADriverVersion',
default='')

gpugroup.add_argument('--force-cuda-runtime-version',
help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
dest='CUDARuntimeVersion',
default='')

opt = parser.parse_args()
if opt.command: opt.command = ' '.join(opt.command)
os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites
Expand Down Expand Up @@ -456,7 +456,7 @@ def stepOrIndex(s):
print(entry,'is not a possible selected entry')

opt.testList = list(set(testList))

if opt.wmcontrol:
performInjectionOptionTest(opt)
if opt.overWrite:
Expand All @@ -465,6 +465,8 @@ def stepOrIndex(s):
import cmd
from colorama import Fore, Style
from os import isatty
import subprocess
import time

class TheMatrix(cmd.Cmd):
intro = "Welcome to the Matrix (? for help)"
Expand All @@ -475,6 +477,7 @@ def __init__(self, opt):
self.opt_ = opt
self.matrices_ = {}
tmp = MatrixReader(self.opt_)
self.processes_ = dict()
for what in tmp.files:
what = what.replace('relval_','')
self.opt_.what = what
Expand Down Expand Up @@ -553,6 +556,98 @@ def do_showWorkflow(self, arg):
Fore.GREEN + wfl.nameId + Fore.RESET))
print("%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.matrices_[k].workFlows)))

def do_runWorkflow(self, arg):
# Split the input arguments into a list
args = arg.split()
if len(args) < 2:
print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
print(Style.RESET_ALL)
return
workflow_class = args[0]
workflow_id = args[1]
passed_down_args = list()
if len(args) > 2:
passed_down_args = args[2:]
print(Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n")
print(Fore.GREEN + Style.BRIGHT + "Workflow class: {}".format(workflow_class))
print(Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id))
print(Fore.GREEN + Style.BRIGHT + "Additional runTheMatrix options: {}".format(passed_down_args))
print(Style.RESET_ALL)
if workflow_class not in self.matrices_.keys():
print(Fore.RED + Style.BRIGHT + "Unknown workflow selected: {}".format(workflow_class))
print("Available workflows:")
for k in self.matrices_.keys():
print(Fore.RED + Style.BRIGHT + k)
print(Style.RESET_ALL)
return
wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows]
if float(workflow_id) not in wflnums:
print(Fore.RED + Style.BRIGHT + "Unknown workflow {}".format(workflow_id))
print(Fore.GREEN + Style.BRIGHT)
print(wflnums)
print(Style.RESET_ALL)
return
if workflow_id in self.processes_.keys():
# Check if the process is still active
if self.processes_[workflow_id][0].poll() is None:
print(Fore.RED + Style.BRIGHT + "Workflow {} already running!".format(workflow_id))
print(Style.RESET_ALL)
return
# If it was there but it's gone, proceeed and update the value for the same key
# run a job, redirecting standard output and error to files
lognames = ['stdout', 'stderr']
logfiles = tuple('%s_%s_%s.log' % (workflow_class, workflow_id, name) for name in lognames)
stdout = open(logfiles[0], 'w')
stderr = open(logfiles[1], 'w')
command = ('runTheMatrix.py', '-w', workflow_class, '-l', workflow_id)
if len(passed_down_args) > 0:
command += tuple(passed_down_args)
print(command)
p = subprocess.Popen(command,
stdout = stdout,
stderr = stderr)
self.processes_[workflow_id] = (p, time.time())


def complete_runWorkflow(self, text, line, start_idx, end_idx):
if text and len(text) > 0:
return [t for t in self.matrices_.keys() if t.startswith(text)]
else:
return self.matrices_.keys()

def help_runWorkflow(self):
print("\n".join(["runWorkflow workflow_class workflow_id\n",
"This command will launch a new and independent process that invokes",
"the command:\n",
"runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
"\nYou can specify just one workflow_class and workflow_id per invocation.",
"The job will continue even after quitting the interactive session.",
"stdout and stderr of the new process will be automatically",
"redirected to 2 logfiles whose names contain the workflow_class",
"and workflow_id. Mutiple command can be issued one after the other.",
"The working directory of the new process will be the directory",
"from which the interactive session has started.",
"Autocompletion is available for workflow_class, but",
"not for workflow_id. Supplying a wrong workflow_class or",
"a non-existing workflow_id for a valid workflow_class",
"will trigger an error and no process will be invoked.",
"The interactive shell will keep track of all active processes",
"and will prevent the accidental resubmission of an already",
"active jobs."]))

def do_jobs(self, args):
print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
for w in self.processes_.keys():
if self.processes_[w][0].poll() is None:
print(Fore.YELLOW + Style.BRIGHT + "Active job: {} since {:.2f} seconds.".format(w, time.time() - self.processes_[w][1]))
else:
print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
print(Style.RESET_ALL)

def help_jobs(self):
print("\n".join(["Print a full list of active and done jobs submitted",
"in the ongoing interactive session"]))

def help_searchInWorkflow(self):
print("\n".join(["searchInWorkflow wfl_name search_regexp\n",
"This command will search for a match within all workflows registered to wfl_name.",
Expand Down