Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pass proxy to dask workflow #2

Merged
merged 3 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions dummy_samples.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
{
"ttbar": [
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_10.root",
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_11.root",
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_12.root"
"root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_10.root",
"root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-100.root"
],
"Data":[
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_13.root",
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_14.root",
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_15.root",
"root://xrootd-cms.infn.it///store/user/anovak/PFNano/106X_vA01/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1/201118_010348/0000/nano106X_on_mini106X_2017_mc_NANO_16.root"

"root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-101.root",
"root://grid-cms-xrootd.physik.rwth-aachen.de:1094//store/user/anovak/PFNano/106X_v2_17/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/RunIIFall17PFNanoAODv2-PU2017_12Apr2018_new_pmx_94X_mc2017_realistic_v14-v1PFNanoV2/210101_174326/0000/nano_mc2017_1-102.root"
]
}
}
43 changes: 32 additions & 11 deletions runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ def validate(file):

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run analysis on baconbits files using processor coffea files')
parser.add_argument( '--wf', '--workflow', dest='workflow', choices=['ttcom'], help='Which processor to run', required=True)
# Inputs
parser.add_argument( '--wf', '--workflow', dest='workflow', choices=['ttcom', 'fattag'], help='Which processor to run', required=True)
parser.add_argument('-o', '--output', default=r'hists.coffea', help='Output histogram filename (default: %(default)s)')
parser.add_argument('--samples', '--json', dest='samplejson', default='dummy_samples.json', help='JSON file containing dataset and file locations (default: %(default)s)')

# Scale out
parser.add_argument('--executor', choices=['iterative', 'futures', 'parsl', 'dask/condor', 'dask/slurm'], default='futures', help='The type of executor to use (default: %(default)s)')
parser.add_argument('-j', '--workers', type=int, default=12, help='Number of workers to use for multi-worker executors (e.g. futures or condor) (default: %(default)s)')
parser.add_argument('--voms', default=None, type=str, help='Path to voms proxy, accsessible to worker nodes. By default a copy will be made to $HOME.')

# Debugging
parser.add_argument('--validate', action='store_true', help='Do not process, just check all files are accessible')
parser.add_argument('--only', type=str, default=None, help='Only process specific dataset or file')
parser.add_argument('--limit', type=int, default=None, metavar='N', help='Limit to the first N files of each dataset in sample JSON')
parser.add_argument('--chunk', type=int, default=250000, metavar='N', help='Number of events per process chunk')
parser.add_argument('--chunk', type=int, default=500000, metavar='N', help='Number of events per process chunk')
parser.add_argument('--max', type=int, default=None, metavar='N', help='Max number of chunks to run in total')

args = parser.parse_args()
if args.output == parser.get_default('output'):
args.output = f'hists_{args.workflow}_{(args.samplejson).rstrip(".json")}.coffea'
Expand Down Expand Up @@ -86,6 +91,9 @@ def validate(file):
if args.workflow == "ttcom":
from workflows.ttbar_validation import NanoProcessor
processor_instance = NanoProcessor()
elif args.workflow == "fattag":
from workflows.fatjet_tagger import NanoProcessor
processor_instance = NanoProcessor()
else:
raise NotImplemented

Expand All @@ -101,9 +109,8 @@ def validate(file):
processor_instance=processor_instance,
executor=_exec,
executor_args={
'skipbadfiles':True,
'skipbadfiles':False,
'schema': processor.NanoAODSchema,
'flatten':True,
'workers': 4},
chunksize=args.chunk, maxchunks=args.max
)
Expand All @@ -115,25 +122,39 @@ def validate(file):
from distributed import Client
from dask.distributed import performance_report

if args.voms is not None:
_x509_path = args.voms
else:
_x509_localpath = [l for l in os.popen('voms-proxy-info').read().split("\n") if l.startswith('path')][0].split(":")[-1].strip()
_x509_path = os.environ['HOME'] + f'/.{_x509_localpath.split("/")[-1]}'
os.system(f'cp {_x509_localpath} {_x509_path}')

env_extra = [
'export XRD_RUNFORKHANDLER=1',
f'export X509_USER_PROXY={_x509_path}',
f'export X509_CERT_DIR={os.environ["X509_CERT_DIR"]}',
'ulimit -u 32768',
]

if 'slurm' in args.executor:
cluster = SLURMCluster(
queue='all',
cores=16,
processes=16,
cores=args.workers,
processes=args.workers,
memory="200 GB",
retries=10,
walltime='00:30:00',
env_extra=['ulimit -u 32768'],
env_extra=env_extra,
)
elif 'condor' in args.executor:
cluster = HTCondorCluster(
cluster = HTCondorCluster(
cores=1,
memory='2GB',
disk='2GB',
env_extra=env_extra,
)
cluster.scale(jobs=10)

print(cluster.job_script())
client = Client(cluster)
with performance_report(filename="dask-report.html"):
output = processor.run_uproot_job(sample_dict,
Expand All @@ -142,12 +163,12 @@ def validate(file):
executor=processor.dask_executor,
executor_args={
'client': client,
'skipbadfiles':True,
'skipbadfiles':False,
'schema': processor.NanoAODSchema,
'flatten':True,
},
chunksize=args.chunk, maxchunks=args.max
)

save(output, args.output)

print(output)
Expand Down
2 changes: 2 additions & 0 deletions workflows/ttbar_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self):

_hist_dict = {**_hist_jet_dict, **_hist_event_dict}
self._accumulator = processor.dict_accumulator(_hist_dict)
self._accumulator['sumw'] = processor.defaultdict_accumulator(float)


@property
Expand All @@ -80,6 +81,7 @@ def process(self, events):
output = self.accumulator.identity()

dataset = events.metadata['dataset']
output['sumw'][dataset] += ak.sum(events.genWeight)

##############
# Trigger level
Expand Down