diff --git a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py index 0cf8caad887ae..610460b31f02d 100755 --- a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py +++ b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py @@ -135,31 +135,31 @@ def runSelected(opt): dest='memPerCore', type=int, default=1500) - + parser.add_argument('-j','--nproc', help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs', dest='nProcs', type=int, default=4) - + parser.add_argument('-t','--nThreads', help='number of threads per process to use in cmsRun.', dest='nThreads', type=int, default=1) - + parser.add_argument('--nStreams', help='number of streams to use in cmsRun.', dest='nStreams', type=int, default=0) - + parser.add_argument('--nEvents', help='number of events to process in cmsRun. If 0 will use the standard 10 events.', dest='nEvents', type=int, default=0) - + parser.add_argument('--numberEventsInLuminosityBlock', help='number of events in a luminosity block', dest='numberEventsInLuminosityBlock', @@ -171,19 +171,19 @@ def runSelected(opt): dest='show', default=False, action='store_true') - + parser.add_argument('-e','--extended', help='Show details of workflows, used with --show', dest='extended', default=False, action='store_true') - + parser.add_argument('-s','--selected', help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited', dest='restricted', default=False, action='store_true') - + parser.add_argument('-l','--list', help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc', dest='testList', @@ -197,105 +197,105 @@ def runSelected(opt): parser.add_argument('-r','--raw', help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)', dest='raw') - + parser.add_argument('-i','--useInput', help='Use recyling where available. Either all, or a comma separated list of wf number.', dest='useInput', type=lambda x: x.split(','), default=None) - + parser.add_argument('-w','--what', help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )', dest='what', default='all') - + parser.add_argument('--step1', help='Used with --raw. Limit the production to step1', dest='step1Only', default=False) - + parser.add_argument('--maxSteps', help='Only run maximum on maxSteps. Used when we are only interested in first n steps.', dest='maxSteps', default=9999, type=int) - + parser.add_argument('--fromScratch', help='Comma separated list of wf to be run without recycling. all is not supported as default.', dest='fromScratch', type=lambda x: x.split(','), default=None) - + parser.add_argument('--refRelease', help='Allow to modify the recycling dataset version', dest='refRel', default=None) - + parser.add_argument('--wmcontrol', help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent', choices=['init','test','submit','force'], dest='wmcontrol', default=None) - + parser.add_argument('--revertDqmio', help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO', choices=['yes','no'], dest='revertDqmio', default='no') - + parser.add_argument('--optionswm', help='Specify a few things for wm injection', default='', dest='wmoptions') - + parser.add_argument('--keep', help='allow to specify for which comma separated steps the output is needed', default=None) - + parser.add_argument('--label', help='allow to give a special label to the output dataset name', default='') - + parser.add_argument('--command', help='provide a way to add additional command to all of the cmsDriver commands in the matrix', dest='command', action='append', default=None) - + parser.add_argument('--apply', help='allow to use the --command only for 1 comma separeated', dest='apply', default=None) - + parser.add_argument('--workflow', help='define a workflow to be created or altered from the matrix', action='append', dest='workflow', default=None) - + parser.add_argument('--dryRun', help='do not run the wf at all', action='store_true', dest='dryRun', default=False) - + parser.add_argument('--testbed', help='workflow injection to cmswebtest (you need dedicated rqmgr account)', dest='testbed', default=False, action='store_true') - + parser.add_argument('--noCafVeto', help='Run from any source, ignoring the CAF label', dest='cafVeto', default=True, action='store_false') - + parser.add_argument('--overWrite', help='Change the content of a step for another. List of pairs.', dest='overWrite', default=None) - + parser.add_argument('--noRun', help='Remove all run list selection from wfs', dest='noRun', @@ -313,30 +313,30 @@ def runSelected(opt): dest='jobReports', default=False, action='store_true') - + parser.add_argument('--ibeos', help='Use IB EOS site configuration', dest='IBEos', default=False, action='store_true') - + parser.add_argument('--sites', help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.', dest='dasSites', default='T2_CH_CERN', action='store') - + parser.add_argument('--interactive', help="Open the Matrix interactive shell", action='store_true', default=False) - + parser.add_argument('--dbs-url', help='Overwrite DbsUrl value in JSON submitted to ReqMgr2', dest='dbsUrl', default=None, action='store') - + gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.') gpugroup.add_argument('--gpu','--requires-gpu', @@ -353,13 +353,13 @@ def runSelected(opt): dest='GPUMemoryMB', type=int, default=8000) - + gpugroup.add_argument('--cuda-capabilities', help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.', dest='CUDACapabilities', type=lambda x: x.split(','), default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6') - + # read the CUDA runtime version included in CMSSW cudart_version = None libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so')) @@ -370,22 +370,22 @@ def runSelected(opt): help='Specify major and minor version of the CUDA runtime used to build the application.', dest='CUDARuntime', default=cudart_version) - + gpugroup.add_argument('--force-gpu-name', help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.', dest='GPUName', default='') - + gpugroup.add_argument('--force-cuda-driver-version', help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.', dest='CUDADriverVersion', default='') - + gpugroup.add_argument('--force-cuda-runtime-version', help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.', dest='CUDARuntimeVersion', default='') - + opt = parser.parse_args() if opt.command: opt.command = ' '.join(opt.command) os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites @@ -456,7 +456,7 @@ def stepOrIndex(s): print(entry,'is not a possible selected entry') opt.testList = list(set(testList)) - + if opt.wmcontrol: performInjectionOptionTest(opt) if opt.overWrite: @@ -465,6 +465,8 @@ def stepOrIndex(s): import cmd from colorama import Fore, Style from os import isatty + import subprocess + import time class TheMatrix(cmd.Cmd): intro = "Welcome to the Matrix (? for help)" @@ -475,6 +477,7 @@ def __init__(self, opt): self.opt_ = opt self.matrices_ = {} tmp = MatrixReader(self.opt_) + self.processes_ = dict() for what in tmp.files: what = what.replace('relval_','') self.opt_.what = what @@ -553,6 +556,98 @@ def do_showWorkflow(self, arg): Fore.GREEN + wfl.nameId + Fore.RESET)) print("%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.matrices_[k].workFlows))) + def do_runWorkflow(self, arg): + # Split the input arguments into a list + args = arg.split() + if len(args) < 2: + print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed") + print(Style.RESET_ALL) + return + workflow_class = args[0] + workflow_id = args[1] + passed_down_args = list() + if len(args) > 2: + passed_down_args = args[2:] + print(Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n") + print(Fore.GREEN + Style.BRIGHT + "Workflow class: {}".format(workflow_class)) + print(Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id)) + print(Fore.GREEN + Style.BRIGHT + "Additional runTheMatrix options: {}".format(passed_down_args)) + print(Style.RESET_ALL) + if workflow_class not in self.matrices_.keys(): + print(Fore.RED + Style.BRIGHT + "Unknown workflow selected: {}".format(workflow_class)) + print("Available workflows:") + for k in self.matrices_.keys(): + print(Fore.RED + Style.BRIGHT + k) + print(Style.RESET_ALL) + return + wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows] + if float(workflow_id) not in wflnums: + print(Fore.RED + Style.BRIGHT + "Unknown workflow {}".format(workflow_id)) + print(Fore.GREEN + Style.BRIGHT) + print(wflnums) + print(Style.RESET_ALL) + return + if workflow_id in self.processes_.keys(): + # Check if the process is still active + if self.processes_[workflow_id][0].poll() is None: + print(Fore.RED + Style.BRIGHT + "Workflow {} already running!".format(workflow_id)) + print(Style.RESET_ALL) + return + # If it was there but it's gone, proceeed and update the value for the same key + # run a job, redirecting standard output and error to files + lognames = ['stdout', 'stderr'] + logfiles = tuple('%s_%s_%s.log' % (workflow_class, workflow_id, name) for name in lognames) + stdout = open(logfiles[0], 'w') + stderr = open(logfiles[1], 'w') + command = ('runTheMatrix.py', '-w', workflow_class, '-l', workflow_id) + if len(passed_down_args) > 0: + command += tuple(passed_down_args) + print(command) + p = subprocess.Popen(command, + stdout = stdout, + stderr = stderr) + self.processes_[workflow_id] = (p, time.time()) + + + def complete_runWorkflow(self, text, line, start_idx, end_idx): + if text and len(text) > 0: + return [t for t in self.matrices_.keys() if t.startswith(text)] + else: + return self.matrices_.keys() + + def help_runWorkflow(self): + print("\n".join(["runWorkflow workflow_class workflow_id\n", + "This command will launch a new and independent process that invokes", + "the command:\n", + "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]", + "\nYou can specify just one workflow_class and workflow_id per invocation.", + "The job will continue even after quitting the interactive session.", + "stdout and stderr of the new process will be automatically", + "redirected to 2 logfiles whose names contain the workflow_class", + "and workflow_id. Mutiple command can be issued one after the other.", + "The working directory of the new process will be the directory", + "from which the interactive session has started.", + "Autocompletion is available for workflow_class, but", + "not for workflow_id. Supplying a wrong workflow_class or", + "a non-existing workflow_id for a valid workflow_class", + "will trigger an error and no process will be invoked.", + "The interactive shell will keep track of all active processes", + "and will prevent the accidental resubmission of an already", + "active jobs."])) + + def do_jobs(self, args): + print(Fore.GREEN + Style.BRIGHT + "List of jobs:") + for w in self.processes_.keys(): + if self.processes_[w][0].poll() is None: + print(Fore.YELLOW + Style.BRIGHT + "Active job: {} since {:.2f} seconds.".format(w, time.time() - self.processes_[w][1])) + else: + print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w)) + print(Style.RESET_ALL) + + def help_jobs(self): + print("\n".join(["Print a full list of active and done jobs submitted", + "in the ongoing interactive session"])) + def help_searchInWorkflow(self): print("\n".join(["searchInWorkflow wfl_name search_regexp\n", "This command will search for a match within all workflows registered to wfl_name.",