From adfd0e4b3f38bc2ca4957977cb513dd8d27c40c7 Mon Sep 17 00:00:00 2001 From: Andrzej Date: Mon, 1 Feb 2021 20:43:37 +0100 Subject: [PATCH 1/3] fix: pass proxy to dask workflow --- dummy_samples.json | 14 +++++------- runner.py | 42 ++++++++++++++++++++++++++--------- workflows/ttbar_validation.py | 2 ++ 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/dummy_samples.json b/dummy_samples.json index bc3df683..ae96b3cc 100644 --- a/dummy_samples.json +++ b/dummy_samples.json @@ -1,14 +1,10 @@ { "ttbar": [ - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_10.root", - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_11.root", - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_12.root" + "root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_10.root", + "root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-100.root" ], "Data":[ - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_13.root", - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_14.root", - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_15.root", - "root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_16.root" - + "root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-101.root", + "root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-102.root" ] -} \ No newline at end of file +} diff --git a/runner.py b/runner.py index 69c8d8cc..03fcdd49 100644 --- a/runner.py +++ b/runner.py @@ -22,18 +22,23 @@ def validate(file): if __name__ == '__main__': parser = argparse.ArgumentParser(description='Run analysis on baconbits files using processor coffea files') - parser.add_argument( '--wf', '--workflow', dest='workflow', choices=['ttcom'], help='Which processor to run', required=True) + # Inputs + parser.add_argument( '--wf', '--workflow', dest='workflow', choices=['ttcom', 'fattag'], help='Which processor to run', required=True) parser.add_argument('-o', '--output', default=r'hists.coffea', help='Output histogram filename (default: %(default)s)') parser.add_argument('--samples', '--json', dest='samplejson', default='dummy_samples.json', help='JSON file containing dataset and file locations (default: %(default)s)') + # Scale out parser.add_argument('--executor', choices=['iterative', 'futures', 'parsl', 'dask/condor', 'dask/slurm'], default='futures', help='The type of executor to use (default: %(default)s)') parser.add_argument('-j', '--workers', type=int, default=12, help='Number of workers to use for multi-worker executors (e.g. futures or condor) (default: %(default)s)') + parser.add_argument('--voms', default=None, type=str, help='Path to voms proxy, accsessible to worker nodes. By default a copy will be made to $HOME.') + # Debugging parser.add_argument('--validate', action='store_true', help='Do not process, just check all files are accessible') parser.add_argument('--only', type=str, default=None, help='Only process specific dataset or file') parser.add_argument('--limit', type=int, default=None, metavar='N', help='Limit to the first N files of each dataset in sample JSON') - parser.add_argument('--chunk', type=int, default=250000, metavar='N', help='Number of events per process chunk') + parser.add_argument('--chunk', type=int, default=500000, metavar='N', help='Number of events per process chunk') parser.add_argument('--max', type=int, default=None, metavar='N', help='Max number of chunks to run in total') + args = parser.parse_args() if args.output == parser.get_default('output'): args.output = f'hists_{args.workflow}_{(args.samplejson).rstrip(".json")}.coffea' @@ -86,6 +91,9 @@ def validate(file): if args.workflow == "ttcom": from workflows.ttbar_validation import NanoProcessor processor_instance = NanoProcessor() + elif args.workflow == "fattag": + from workflows.fatjet_tagger import NanoProcessor + processor_instance = NanoProcessor() else: raise NotImplemented @@ -101,9 +109,8 @@ def validate(file): processor_instance=processor_instance, executor=_exec, executor_args={ - 'skipbadfiles':True, + 'skipbadfiles':False, 'schema': processor.NanoAODSchema, - 'flatten':True, 'workers': 4}, chunksize=args.chunk, maxchunks=args.max ) @@ -115,25 +122,40 @@ def validate(file): from distributed import Client from dask.distributed import performance_report + if args.voms is not None: + _x509_path = args.voms + else: + _x509_localpath = [l for l in os.popen('voms-proxy-info').read().split("\n") if l.startswith('path')][0].split(":")[-1].strip() + _x509_path = os.environ['HOME'] + f'/.{_x509_localpath.split("/")[-1]}' + os.system(f'cp {_x509_localpath} {_x509_path}') + + env_extra = [ + 'export XRD_RUNFORKHANDLER=1', + f'export X509_USER_PROXY={_tmp_path}', + f'export X509_CERT_DIR={os.environ["X509_CERT_DIR"]}', + 'ulimit -u 32768', + ] + if 'slurm' in args.executor: cluster = SLURMCluster( queue='all', - cores=16, - processes=16, + cores=args.workers, + processes=args.workers, memory="200 GB", retries=10, walltime='00:30:00', - env_extra=['ulimit -u 32768'], + env_extra=env_extra, ) elif 'condor' in args.executor: cluster = HTCondorCluster( cores=1, memory='2GB', disk='2GB', + env_extra=env_extra, ) cluster.scale(jobs=10) - print(cluster.job_script()) + client = Client(cluster) with performance_report(filename="dask-report.html"): output = processor.run_uproot_job(sample_dict, @@ -142,12 +164,12 @@ def validate(file): executor=processor.dask_executor, executor_args={ 'client': client, - 'skipbadfiles':True, + 'skipbadfiles':False, 'schema': processor.NanoAODSchema, - 'flatten':True, }, chunksize=args.chunk, maxchunks=args.max ) + save(output, args.output) print(output) diff --git a/workflows/ttbar_validation.py b/workflows/ttbar_validation.py index 2b695721..c39e32c9 100644 --- a/workflows/ttbar_validation.py +++ b/workflows/ttbar_validation.py @@ -70,6 +70,7 @@ def __init__(self): _hist_dict = {**_hist_jet_dict, **_hist_event_dict} self._accumulator = processor.dict_accumulator(_hist_dict) + self._accumulator['sumw'] = processor.defaultdict_accumulator(float) @property @@ -80,6 +81,7 @@ def process(self, events): output = self.accumulator.identity() dataset = events.metadata['dataset'] + output['sumw'][dataset] += ak.sum(events.genWeight) ############## # Trigger level From 0d7e27934f9bc9531894bf18e6e5de38a8c62afc Mon Sep 17 00:00:00 2001 From: Andrzej Date: Mon, 1 Feb 2021 21:35:44 +0100 Subject: [PATCH 2/3] fix: proxy path --- runner.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runner.py b/runner.py index 03fcdd49..e82eecac 100644 --- a/runner.py +++ b/runner.py @@ -131,7 +131,7 @@ def validate(file): env_extra = [ 'export XRD_RUNFORKHANDLER=1', - f'export X509_USER_PROXY={_tmp_path}', + f'export X509_USER_PROXY={_x509_path}', f'export X509_CERT_DIR={os.environ["X509_CERT_DIR"]}', 'ulimit -u 32768', ] @@ -147,12 +147,15 @@ def validate(file): env_extra=env_extra, ) elif 'condor' in args.executor: - cluster = HTCondorCluster( + cluster = HTCondorCluster( cores=1, memory='2GB', disk='2GB', env_extra=env_extra, ) + import logging + logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG) + cluster.scale(jobs=10) From 3d6f37c4bda69f337b8faa1db29850a250aa3fa8 Mon Sep 17 00:00:00 2001 From: Andrzej Date: Mon, 1 Feb 2021 21:54:25 +0100 Subject: [PATCH 3/3] fix: test on condor --- runner.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runner.py b/runner.py index e82eecac..0c94a938 100644 --- a/runner.py +++ b/runner.py @@ -153,12 +153,8 @@ def validate(file): disk='2GB', env_extra=env_extra, ) - import logging - logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG) - cluster.scale(jobs=10) - client = Client(cluster) with performance_report(filename="dask-report.html"): output = processor.run_uproot_job(sample_dict,