diff --git a/pippin/cosmofitters/cosmomc.py b/pippin/cosmofitters/cosmomc.py index af158f13..2886a47b 100644 --- a/pippin/cosmofitters/cosmomc.py +++ b/pippin/cosmofitters/cosmomc.py @@ -52,7 +52,7 @@ def __init__(self, name, output_dir, config, options, global_config, dependencie self.path_to_cosmomc = get_output_loc(self.global_config["CosmoMC"]["location"]) self.create_cov_dep = self.get_dep(CreateCov) - self.blind = self.create_cov_dep.output["blind"] if self.create_cov_dep is not None else self.options.get("BLIND", False) + self.blind = np.all(self.create_cov_dep.output["blind"]) if self.create_cov_dep is not None else self.options.get("BLIND", False) assert isinstance(self.blind, (bool, np.bool_)), "Blind should be set to a boolan value!" self.ini_prefix = options.get("INI").replace(".ini", "") self.static = self.ini_prefix.replace(".ini", "") in ["cmb_omw", "cmb_omol"] diff --git a/pippin/cosmofitters/wfit.py b/pippin/cosmofitters/wfit.py index 821f2c70..1e16c2c4 100644 --- a/pippin/cosmofitters/wfit.py +++ b/pippin/cosmofitters/wfit.py @@ -7,7 +7,6 @@ from pippin.config import mkdirs, get_output_loc, get_data_loc, chown_dir, read_yaml from pippin.create_cov import CreateCov -from pippin.sb_create_cov import SBCreateCov from pippin.cosmofitters.cosmofit import CosmoFit from pippin.base import ConfigBasedExecutable from pippin.task import Task @@ -94,7 +93,7 @@ def _run(self): @staticmethod def get_tasks(c, prior_tasks, base_output_dir, stage_number, prefix, global_config): - create_cov_tasks = Task.get_task_of_type(prior_tasks, CreateCov) + Task.get_task_of_type(prior_tasks, SBCreateCov) + create_cov_tasks = Task.get_task_of_type(prior_tasks, CreateCov) def _get_wfit_dir(base_output_dir, stage_number, name): return f"{base_output_dir}/{stage_number}_COSMOFIT/WFIT/{name}" diff --git a/pippin/create_cov.py b/pippin/create_cov.py index aeea6e18..6b9a1b2b 100644 --- a/pippin/create_cov.py +++ b/pippin/create_cov.py @@ -1,20 +1,18 @@ -import inspect -import shutil -import subprocess import os from pathlib import Path - import yaml +import shutil +import subprocess from pippin.base import ConfigBasedExecutable -from pippin.biascor import BiasCor -from pippin.config import mkdirs, get_config, get_data_loc, read_yaml, merge_dict from pippin.task import Task +from pippin.biascor import BiasCor import pippin.cosmofitters.cosmomc as cosmomc - +from pippin.config import get_data_loc, get_config, read_yaml, mkdirs, chown_dir class CreateCov(ConfigBasedExecutable): - """ Create covariance matrices and data from salt2mu used for cosmomc + """ Create covariance matrices and data from salt2mu used for cosmomc and wfit. + Run through submit_batch CONFIGURATION: ============== @@ -41,72 +39,94 @@ class CreateCov(ConfigBasedExecutable): """ - def __init__(self, name, output_dir, config, options, global_config, dependencies=None, index=0): - base_file = get_data_loc("create_cov/input_file.txt") - super().__init__(name, output_dir, config, base_file, default_assignment=": ", dependencies=dependencies) + def __init__(self, name, output_dir, config, options, global_config, dependencies=None): + + base_file = get_data_loc("create_cov/COVMAT.input") + super().__init__(name, output_dir, config, base_file, default_assignment=": ", dependencies = dependencies) if options is None: options = {} self.options = options self.templates_dir = self.options.get("INI_DIR", "cosmomc_templates") self.global_config = get_config() - self.index = index self.job_name = os.path.basename(Path(output_dir).parents[1]) + "_CREATE_COV_" + name - #self.path_to_code = os.path.abspath(os.path.dirname(inspect.stack()[0][1]) + "/external/") - self.path_to_code = '$SNANA_DIR/util/' #Now maintained by SNANA - - self.batch_mem = options.get("BATCH_MEM", "4GB") - - self.logfile = os.path.join(self.output_dir, "output.log") - self.sys_file_out = os.path.join(self.output_dir, "sys_scale.yml") - self.chain_dir = os.path.join(self.output_dir, "chains/") self.config_dir = os.path.join(self.output_dir, "output") - self.wfit_inpdir = [self.config_dir] - self.subtract_vpec = options.get("SUBTRACT_VPEC", False) - self.unbinned_covmat_addin = options.get("UNBINNED_COVMAT_ADDIN", []) - - self.batch_file = self.options.get("BATCH_FILE") - if self.batch_file is not None: - self.batch_file = get_data_loc(self.batch_file) - self.batch_replace = self.options.get("BATCH_REPLACE", {}) - - self.binned = options.get("BINNED", not self.subtract_vpec) - self.rebinned_x1 = options.get("REBINNED_X1", "") - if self.rebinned_x1 != "": - self.rebinned_x1 = f"--nbin_x1 {self.rebinned_x1}" - self.rebinned_c = options.get("REBINNED_C", "") - if self.rebinned_c != "": - self.rebinned_c = f"--nbin_c {self.rebinned_c}" - - self.biascor_dep = self.get_dep(BiasCor, fail=True) - self.sys_file_in = self.get_sys_file_in() - self.output["blind"] = self.biascor_dep.output["blind"] - self.input_file = os.path.join(self.output_dir, self.biascor_dep.output["subdirs"][index] + ".input") - self.calibration_set = options.get("CALIBRATORS", []) - self.output["hubble_plot"] = self.biascor_dep.output["hubble_plot"] + self.wfit_inpdir = [] + for d in dependencies: + num_dirs = len(d.output["subdirs"]) + if num_dirs > 1: + for i in range(num_dirs): + self.wfit_inpdir.append(os.path.join(self.config_dir, f"{self.name}_{d.name}_OUTPUT_BBCFIT-{str(i+1).zfill(4)}")) + else: + self.wfit_inpdir.append(os.path.join(self.config_dir, f"{self.name}_{d.name}_OUTPUT_BBCFIT")) + self.done_file = os.path.join(self.config_dir, "ALL.DONE") + self.input_name = f"{self.job_name}.INPUT" + self.input_file = os.path.join(self.output_dir, self.input_name) + self.logfile = os.path.join(self.output_dir, "output.log") + # BATCH Options + BATCH_INFO = self.options.get("BATCH_INFO") + if BATCH_INFO is None: + BATCH_FILE = self.options.get("BATCH_FILE") + if BATCH_FILE is not None: + BATCH_FILE = get_data_loc(BATCH_FILE) + else: + if self.gpu: + BATCH_FILE = self.global_config["SBATCH"]["gpu_location"] + else: + BATCH_FILE = self.global_config["SBATCH"]["cpu_location"] + num_jobs = 0 + for d in dependencies: + num_jobs += len(d.output["subdirs"]) + # To make sure we never ask for too many cores + if num_jobs > 20: + num_jobs = 20 + BATCH_INFO = f"sbatch {BATCH_FILE} {num_jobs}" + BATCH_REPLACE = self.options.get("BATCH_REPLACE", {}) + if BATCH_REPLACE != {}: + BATCH_MEM = BATCH_REPLACE.get("REPLACE_MEM", "2GB") + BATCH_WALLTIME = BATCH_REPLACE.get("REPLACE_WALLTIME", "24:00:00") + else: + BATCH_MEM = self.options.get("BATCH_MEM", "2GB") + BATCH_WALLTIME = self.options.get("BATCH_WALLTIME", "24:00:00") + self.yaml["CONFIG"]["BATCH_INFO"] = BATCH_INFO + self.yaml["CONFIG"]["BATCH_MEM"] = BATCH_MEM + self.yaml["CONFIG"]["BATCH_WALLTIME"] = BATCH_WALLTIME + + # create_covariance.py input file + self.input_covmat_file = get_data_loc("create_cov/input_file.txt") + self.output_covmat_file = os.path.join(self.output_dir, "input_file.txt") self.prepare_cosmomc = self.config.get("COSMOMC", False) - if self.prepare_cosmomc: self.logger.info("Generating CosmoMC output") else: self.logger.info("Not generating CosmoMC output") + self.sys_file_in = self.get_sys_file_in() + self.sys_file_out = os.path.join(self.output_dir, "sys_scale.yml") + self.calibration_set = options.get("CALIBRATORS", []) + self.prepare_input_covmat() + self.yaml["CONFIG"]["INPUT_COVMAT_FILE"] = self.output_covmat_file + + # Rest of the submit_batch input file + self.bbc_outdirs = self.get_bbc_outdirs() + self.yaml["CONFIG"]["BBC_OUTDIR"] = self.bbc_outdirs + + self.covmatopt = self.get_covmatopt() + self.yaml["CONFIG"]["COVMATOPT"] = [self.covmatopt] + + self.yaml["CONFIG"]["OUTDIR"] = self.config_dir + + # Output + self.output["blind"] = [d.output["blind"] for d in self.dependencies] + self.output["hubble_plot"] = [d.output["hubble_plot"] for d in self.dependencies] covopts_map = {"ALL": 0} for i, covopt in enumerate(self.options.get("COVOPTS", [])): covopts_map[covopt.split("]")[0][1:]] = i + 1 self.output["covopts"] = covopts_map self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc") - self.output["index"] = index - self.output["bcor_name"] = self.biascor_dep.name - self.slurm = """{sbatch_header} - {task_setup} -if [ $? -eq 0 ]; then - echo SUCCESS > {done_file} -else - echo FAILURE > {done_file} -fi -""" + self.output["index"] = 0 + self.output["bcor_name"] = [d.name for d in self.dependencies] def add_dependent(self, task): self.dependents.append(task) @@ -114,6 +134,61 @@ def add_dependent(self, task): self.logger.info("CosmoMC task found, CreateCov will generate CosmoMC output") self.prepare_cosmomc = True + def _check_completion(self, squeue): + if os.path.exists(self.done_file): + self.logger.debug(f"Done file found at {self.done_file}") + with open(self.done_file) as f: + if "FAIL" in f.read(): + self.logger.error(f"Done file reported failure. Check output log {self.logfile}") + self.scan_files_for_error([self.logfile], "ERROR", "EXCEPTION") + return Task.FINISHED_FAILURE + else: + return Task.FINISHED_SUCCESS + return self.check_for_job(squeue, self.job_name) + + def load_input_covmat(self): + with open(self.input_covmat_file, "r") as f: + file_lines = list(f.read().splitlines()) + for index, line in enumerate(file_lines): + if "#END_YAML" in line: + yaml_str = "\n".join(file_lines[:index]) + self.input_covmat_yaml = yaml.safe_load(yaml_str) + break + + def prepare_input_covmat(self): + self.load_input_covmat() + if self.prepare_cosmomc: + self.input_covmat_yaml["COSMOMC_TEMPLATES_PATH"] = get_data_loc(self.templates_dir) + else: + self.input_covmat_yaml.pop("COSMOMC_TEMPLATES_PATH", None) + self.input_covmat_yaml["SYS_SCALE_FILE"] = self.sys_file_out + self.input_covmat_yaml["COVOPTS"] = self.options.get("COVOPTS", []) + self.input_covmat_yaml["EXTRA_COVS"] = self.options.get("EXTRA_COVS", []) + self.input_covmat_yaml["CALIBRATORS"] = self.calibration_set + + def get_bbc_outdirs(self): + bbc_outdirs = [] + for d in self.dependencies: + name = d.name + output = d.fit_output_dir + bbc_outdirs.append(f"/{name}/ {output}") + return bbc_outdirs + + def get_covmatopt(self): + rebin_x1 = self.options.get("REBINNED_X1", 0) + rebin_c = self.options.get("REBINNED_C", 0) + if (rebin_x1 + rebin_c > 0): + if (rebin_x1 == 0) or (rebin_c == 0): + Task.fail_config(f"If rebin, both REBINNED_X1 ({rebin_x1}) and REBINNED_C ({rebin_c}) must be greater than 0") + else: + cmd = f"--nbin_x1 {rebin_x1} --nbin_c {rebin_c}" + elif self.options.get("SUBTRACT_VPEC", False): + cmd = "--subtract_vpec" + elif self.options.get("BINNED", True): + cmd = "" + else: + cmd = "--unbinned" + return f"/{self.name}/ {cmd}" def get_sys_file_in(self): set_file = self.options.get("SYS_SCALE") @@ -124,28 +199,18 @@ def get_sys_file_in(self): raise ValueError(f"Unable to resolve path to {set_file}") else: self.logger.debug("Searching for SYS_SCALE source from biascor task") - fitopt_files = [f for f in self.biascor_dep.output["fitopt_files"] if f is not None] - assert len(set(fitopt_files)) < 2, f"Cannot automatically determine scaling from FITOPT file as you have multiple files: {fitopt_files}" - if fitopt_files: - path = fitopt_files[0] - else: - path = None + path = None + for d in self.dependencies: + fitopt_files = [] + fitopt_files += [f for f in d.output["fitopt_files"] if f is not None] + assert len(set(fitopt_files)) < 2, f"Cannot automatically determine scaling from FITOPT file as you have multiple files: {fitopt_files}" + if (len(fitopt_files) > 0) and (path is None): + path = fitopt_files[0] + break self.options["SYS_SCALE"] = path # Save to options so its serialised out self.logger.info(f"Setting systematics scaling file to {path}") return path - def _check_completion(self, squeue): - if os.path.exists(self.done_file): - self.logger.debug(f"Done file found at {self.done_file}") - with open(self.done_file) as f: - if "FAILURE" in f.read(): - self.logger.error(f"Done file reported failure. Check output log {self.logfile}") - self.scan_files_for_error([self.logfile], "ERROR", "EXCEPTION") - return Task.FINISHED_FAILURE - else: - return Task.FINISHED_SUCCESS - return self.check_for_job(squeue, self.job_name) - def get_scales_from_fitopt_file(self): if self.sys_file_in is None: return {} @@ -156,70 +221,14 @@ def get_scales_from_fitopt_file(self): raw = {k: float(v.split(maxsplit=1)[0]) for _, d in yaml.items() for k, v in d.items()} return raw - def calculate_input(self): - self.logger.debug(f"Calculating input") - if self.prepare_cosmomc: - self.yaml["COSMOMC_TEMPLATES_PATH"] = get_data_loc(self.templates_dir) - else: - self.yaml.pop("COSMOMC_TEMPLATES_PATH", None) - self.yaml["NAME"] = self.name - self.yaml["SYS_SCALE_FILE"] = self.sys_file_out - self.yaml["INPUT_DIR"] = self.biascor_dep.output["fit_output_dir"] - self.yaml["OUTDIR"] = self.config_dir - self.yaml["VERSION"] = self.biascor_dep.output["subdirs"][self.index] - self.yaml["MUOPT_SCALES"] = self.biascor_dep.output["muopt_scales"] - self.yaml["COVOPTS"] = self.options.get("COVOPTS", []) - self.yaml["EXTRA_COVS"] = self.options.get("EXTRA_COVS", []) - self.yaml["CALIBRATORS"] = self.calibration_set - - self.logger.debug(f"YAML: {self.yaml}") - - # Load in sys file, add muopt arguments if needed - # Get the MUOPT_SCALES and FITOPT scales keywords - sys_scale = {**self.get_scales_from_fitopt_file(), **self.options.get("FITOPT_SCALES", {})} - return sys_scale + def get_sys_scale(self): + return {**self.get_scales_from_fitopt_file(), **self.options.get("FITOPT_SCALES", {})} def _run(self): - sys_scale = self.calculate_input() + sys_scale = self.get_sys_scale() + self.logger.debug(f"Final sys_scale: {sys_scale}") - if self.batch_file is None: - if self.gpu: - self.sbatch_header = self.sbatch_gpu_header - else: - self.sbatch_header = self.sbatch_cpu_header - else: - with open(self.batch_file, 'r') as f: - self.sbatch_header = f.read() - self.sbatch_header = self.clean_header(self.sbatch_header) - - - header_dict = { - "REPLACE_NAME": self.job_name, - "REPLACE_WALLTIME": "02:30:00", - "REPLACE_LOGFILE": self.logfile, - "REPLACE_MEM": str(self.batch_mem), - "APPEND": ["#SBATCH --ntasks-per-node=1"] - } - header_dict = merge_dict(header_dict, self.batch_replace) - self.update_header(header_dict) - self.logger.debug(f"Binned: {self.binned}, Rebinned x1: {self.rebinned_x1}, Rebinned c: {self.rebinned_c}") - setup_dict = { - "path_to_code": self.path_to_code, - "input_file": self.input_file, - "output_dir": self.output_dir, - "unbinned": "" if self.binned else "-u", - "nbin_x1": self.rebinned_x1, - "nbin_c": self.rebinned_c, - "subtract_vpec": "" if not self.subtract_vpec else "-s", - } - format_dict = { - "sbatch_header": self.sbatch_header, - "task_setup": self.update_setup(setup_dict, self.task_setup['create_cov']), - "done_file": self.done_file, - } - final_slurm = self.slurm.format(**format_dict) - - final_output_for_hash = self.get_output_string() + yaml.safe_dump(sys_scale, width=2048) + final_slurm + final_output_for_hash = self.get_output_string() new_hash = self.get_hash_from_string(final_output_for_hash) @@ -227,31 +236,34 @@ def _run(self): self.logger.debug("Regenerating and launching task") shutil.rmtree(self.output_dir, ignore_errors=True) mkdirs(self.output_dir) - mkdirs(self.config_dir) self.save_new_hash(new_hash) - # Write sys scales and the main input file - with open(self.sys_file_out, "w") as f: - f.write(yaml.safe_dump(sys_scale, width=2048)) with open(self.input_file, "w") as f: f.write(self.get_output_string()) - # Write out slurm job script - slurm_output_file = os.path.join(self.output_dir, "slurm.job") - with open(slurm_output_file, "w") as f: - f.write(final_slurm) - self.logger.info(f"Submitting batch job for create covariance") - subprocess.run(["sbatch", slurm_output_file], cwd=self.output_dir) + with open(self.sys_file_out, "w") as f: + f.write(yaml.safe_dump(sys_scale, width=2048)) + + with open(self.output_covmat_file, "w") as f: + f.write(yaml.safe_dump(self.input_covmat_yaml, width=2048)) + + cmd = ["submit_batch_jobs.sh", os.path.basename(self.input_file)] + self.logger.debug(f"Submitting CreateCov job: {' '.join(cmd)} in cwd: {self.output_dir}") + self.logger.debug(f"Logging to {self.logfile}") + with open(self.logfile, 'w') as f: + subprocess.run(' '.join(cmd), stdout=f, stderr=subprocess.STDOUT, cwd=self.output_dir, shell=True) + chown_dir(self.output_dir) else: self.should_be_done() self.logger.info("Hash check passed, not rerunning") return True + @staticmethod def get_tasks(c, prior_tasks, base_output_dir, stage_number, prefix, global_config): biascor_tasks = Task.get_task_of_type(prior_tasks, BiasCor) - + def _get_createcov_dir(base_output_dir, stage_number, name): return f"{base_output_dir}/{stage_number}_CREATE_COV/{name}" @@ -263,20 +275,11 @@ def _get_createcov_dir(base_output_dir, stage_number, name): options = config.get("OPTS", {}) mask = config.get("MASK", config.get("MASK_BIASCOR", "")) - for btask in biascor_tasks: - if mask not in btask.name: - continue - - num = len(btask.output["subdirs"]) - for i in range(num): - ii = "" if num == 1 else f"_{i + 1}" - - name = f"{cname}_{btask.name}{ii}" - a = CreateCov(name, _get_createcov_dir(base_output_dir, stage_number, name), config, options, global_config, dependencies=[btask], index=i) - Task.logger.info(f"Creating createcov task {name} for {btask.name} with {a.num_jobs} jobs") - tasks.append(a) + btasks = [btask for btask in biascor_tasks if mask in btask.name] + if len(btasks) == 0: + Task.fail_config(f"Create cov task {cname} has no biascor tasks matching mask {mask}") - if len(biascor_tasks) == 0: - Task.fail_config(f"Create cov task {cname} has no biascor task to run on!") + t = CreateCov(cname, _get_createcov_dir(base_output_dir, stage_number, cname), config, options, global_config, dependencies=btasks) + tasks.append(t) return tasks diff --git a/pippin/manager.py b/pippin/manager.py index 76cbcfc5..a9720446 100644 --- a/pippin/manager.py +++ b/pippin/manager.py @@ -11,7 +11,6 @@ from pippin.config import get_logger, get_config, get_output_dir, mkdirs, chown_dir, chown_file, get_data_loc from pippin.cosmofitters.cosmofit import CosmoFit from pippin.create_cov import CreateCov -from pippin.sb_create_cov import SBCreateCov from pippin.dataprep import DataPrep from pippin.merge import Merger from pippin.snana_fit import SNANALightCurveFit @@ -26,13 +25,6 @@ class Manager: def __init__(self, filename, config_path, config_raw, config, message_store): self.logger = get_logger() self.task_index = {t: i for i, t in enumerate(self.task_order)} - debug1 = config["DEBUG1"] - if debug1: - self.logger.warning("DEBUG1 enabled, replacing CREATE_COV with SB_CREATE_COV") - self.task_index[SBCreateCov] = self.task_index.pop(CreateCov) - ind = self.task_index[SBCreateCov] - Manager.task_order[ind] = SBCreateCov - Manager.stages[ind] = "SB_CREATE_COV" self.message_store = message_store self.filename = filename self.filename_path = config_path diff --git a/pippin/sb_create_cov.py b/pippin/sb_create_cov.py deleted file mode 100644 index 99f49796..00000000 --- a/pippin/sb_create_cov.py +++ /dev/null @@ -1,285 +0,0 @@ -import os -from pathlib import Path -import yaml -import shutil -import subprocess - -from pippin.base import ConfigBasedExecutable -from pippin.task import Task -from pippin.biascor import BiasCor -import pippin.cosmofitters.cosmomc as cosmomc -from pippin.config import get_data_loc, get_config, read_yaml, mkdirs, chown_dir - -class SBCreateCov(ConfigBasedExecutable): - """ Create covariance matrices and data from salt2mu used for cosmomc and wfit. - Run through submit_batch - - CONFIGURATION: - ============== - CREATE_COV: - label: - OPTS: - SUBTRACT_VPEC: False # Subtract VPEC contribution from MUERR if True - SYS_SCALE: location of the fitopts file with scales in it - FITOPT_SCALES: # Optional dict to scale fitopts - fitopt_label_for_partial check: float to scale by # (does label in fitopt, not exact match - MUOPT_SCALES: # Optional dict used to construct SYSFILE input by putting MUOPT scales at the bottom, scale defaults to one - exact_muopt_name: float - COVOPTS: # optional, note you'll get an 'ALL' covopt no matter what - - "[NOSYS] [=DEFAULT,=DEFAULT]" # syntax for Dan&Dillons script. [label] [fitopts_to_match,muopts_to_match]. Does partial matching. =Default means dont do that systematic type - BATCH_INFO: sbatch $SBATCH_TEMPLATES/blah.TEMPLATE 1 - JOB_MAX_WALLTIME: 00:10:00 - OUTPUTS: - ======== - name : name given in the yml - output_dir: top level output directory - ini_dir : The directory the .ini files for cosmomc will be output to - covopts : a dictionary mapping a covopt label to a number - blind: bool - whether or not to blind cosmo results - - """ - - - def __init__(self, name, output_dir, config, options, global_config, dependencies=None): - - base_file = get_data_loc("create_cov/COVMAT.input") - super().__init__(name, output_dir, config, base_file, default_assignment=": ", dependencies = dependencies) - - if options is None: - options = {} - self.options = options - self.templates_dir = self.options.get("INI_DIR", "cosmomc_templates") - self.global_config = get_config() - self.job_name = os.path.basename(Path(output_dir).parents[1]) + "_CREATE_COV_" + name - self.config_dir = os.path.join(self.output_dir, "output") - self.wfit_inpdir = [] - for d in dependencies: - num_dirs = len(d.output["subdirs"]) - if num_dirs > 1: - for i in range(num_dirs): - self.wfit_inpdir.append(os.path.join(self.config_dir, f"{self.name}_{d.name}_OUTPUT_BBCFIT-{str(i+1).zfill(4)}")) - else: - self.wfit_inpdir.append(os.path.join(self.config_dir, f"{self.name}_{d.name}_OUTPUT_BBCFIT")) - self.done_file = os.path.join(self.config_dir, "ALL.DONE") - self.input_name = f"{self.job_name}.INPUT" - self.input_file = os.path.join(self.output_dir, self.input_name) - self.logfile = os.path.join(self.output_dir, "output.log") - - # BATCH Options - BATCH_INFO = self.options.get("BATCH_INFO") - if BATCH_INFO is None: - BATCH_FILE = self.options.get("BATCH_FILE") - if BATCH_FILE is not None: - BATCH_FILE = get_data_loc(BATCH_FILE) - else: - if self.gpu: - BATCH_FILE = self.global_config["SBATCH"]["gpu_location"] - else: - BATCH_FILE = self.global_config["SBATCH"]["cpu_location"] - num_jobs = 0 - for d in dependencies: - num_jobs += len(d.output["subdirs"]) - # To make sure we never ask for too many cores - if num_jobs > 20: - num_jobs = 20 - BATCH_INFO = f"sbatch {BATCH_FILE} {num_jobs}" - BATCH_REPLACE = self.options.get("BATCH_REPLACE", {}) - if BATCH_REPLACE != {}: - BATCH_MEM = BATCH_REPLACE.get("REPLACE_MEM", "2GB") - BATCH_WALLTIME = BATCH_REPLACE.get("REPLACE_WALLTIME", "24:00:00") - else: - BATCH_MEM = self.options.get("BATCH_MEM", "2GB") - BATCH_WALLTIME = self.options.get("BATCH_WALLTIME", "24:00:00") - self.yaml["CONFIG"]["BATCH_INFO"] = BATCH_INFO - self.yaml["CONFIG"]["BATCH_MEM"] = BATCH_MEM - self.yaml["CONFIG"]["BATCH_WALLTIME"] = BATCH_WALLTIME - - # create_covariance.py input file - self.input_covmat_file = get_data_loc("create_cov/input_file.txt") - self.output_covmat_file = os.path.join(self.output_dir, "input_file.txt") - self.prepare_cosmomc = self.config.get("COSMOMC", False) - if self.prepare_cosmomc: - self.logger.info("Generating CosmoMC output") - else: - self.logger.info("Not generating CosmoMC output") - self.sys_file_in = self.get_sys_file_in() - self.sys_file_out = os.path.join(self.output_dir, "sys_scale.yml") - self.calibration_set = options.get("CALIBRATORS", []) - self.prepare_input_covmat() - self.yaml["CONFIG"]["INPUT_COVMAT_FILE"] = self.output_covmat_file - - # Rest of the submit_batch input file - self.bbc_outdirs = self.get_bbc_outdirs() - self.yaml["CONFIG"]["BBC_OUTDIR"] = self.bbc_outdirs - - self.covmatopt = self.get_covmatopt() - self.yaml["CONFIG"]["COVMATOPT"] = [self.covmatopt] - - self.yaml["CONFIG"]["OUTDIR"] = self.config_dir - - # Output - self.output["blind"] = [d.output["blind"] for d in self.dependencies] - self.output["hubble_plot"] = [d.output["hubble_plot"] for d in self.dependencies] - covopts_map = {"ALL": 0} - for i, covopt in enumerate(self.options.get("COVOPTS", [])): - covopts_map[covopt.split("]")[0][1:]] = i + 1 - self.output["covopts"] = covopts_map - self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc") - self.output["index"] = 0 - self.output["bcor_name"] = [d.name for d in self.dependencies] - - def add_dependent(self, task): - self.dependents.append(task) - if isinstance(task, cosmomc.CosmoMC): - self.logger.info("CosmoMC task found, CreateCov will generate CosmoMC output") - self.prepare_cosmomc = True - - def _check_completion(self, squeue): - if os.path.exists(self.done_file): - self.logger.debug(f"Done file found at {self.done_file}") - with open(self.done_file) as f: - if "FAIL" in f.read(): - self.logger.error(f"Done file reported failure. Check output log {self.logfile}") - self.scan_files_for_error([self.logfile], "ERROR", "EXCEPTION") - return Task.FINISHED_FAILURE - else: - return Task.FINISHED_SUCCESS - return self.check_for_job(squeue, self.job_name) - - def load_input_covmat(self): - with open(self.input_covmat_file, "r") as f: - file_lines = list(f.read().splitlines()) - for index, line in enumerate(file_lines): - if "#END_YAML" in line: - yaml_str = "\n".join(file_lines[:index]) - self.input_covmat_yaml = yaml.safe_load(yaml_str) - break - - def prepare_input_covmat(self): - self.load_input_covmat() - if self.prepare_cosmomc: - self.input_covmat_yaml["COSMOMC_TEMPLATES_PATH"] = get_data_loc(self.templates_dir) - else: - self.input_covmat_yaml.pop("COSMOMC_TEMPLATES_PATH", None) - self.input_covmat_yaml["SYS_SCALE_FILE"] = self.sys_file_out - self.input_covmat_yaml["COVOPTS"] = self.options.get("COVOPTS", []) - self.input_covmat_yaml["EXTRA_COVS"] = self.options.get("EXTRA_COVS", []) - self.input_covmat_yaml["CALIBRATORS"] = self.calibration_set - - def get_bbc_outdirs(self): - bbc_outdirs = [] - for d in self.dependencies: - name = d.name - output = d.fit_output_dir - bbc_outdirs.append(f"/{name}/ {output}") - return bbc_outdirs - - def get_covmatopt(self): - rebin_x1 = self.options.get("REBINNED_X1", 0) - rebin_c = self.options.get("REBINNED_C", 0) - if (rebin_x1 + rebin_c > 0): - if (rebin_x1 == 0) or (rebin_c == 0): - Task.fail_config(f"If rebin, both REBINNED_X1 ({rebin_x1}) and REBINNED_C ({rebin_c}) must be greater than 0") - else: - cmd = f"--nbin_x1 {rebin_x1} --nbin_c {rebin_c}" - elif self.options.get("SUBTRACT_VPEC", False): - cmd = "--subtract_vpec" - elif self.options.get("BINNED", True): - cmd = "" - else: - cmd = "--unbinned" - return f"/{self.name}/ {cmd}" - - def get_sys_file_in(self): - set_file = self.options.get("SYS_SCALE") - if set_file is not None: - self.logger.debug(f"Explicit SYS_SCALE file specified: {set_file}") - path = get_data_loc(set_file) - if path is None: - raise ValueError(f"Unable to resolve path to {set_file}") - else: - self.logger.debug("Searching for SYS_SCALE source from biascor task") - path = None - for d in self.dependencies: - fitopt_files = [] - fitopt_files += [f for f in d.output["fitopt_files"] if f is not None] - assert len(set(fitopt_files)) < 2, f"Cannot automatically determine scaling from FITOPT file as you have multiple files: {fitopt_files}" - if (len(fitopt_files) > 0) and (path is None): - path = fitopt_files[0] - break - self.options["SYS_SCALE"] = path # Save to options so its serialised out - self.logger.info(f"Setting systematics scaling file to {path}") - return path - - def get_scales_from_fitopt_file(self): - if self.sys_file_in is None: - return {} - self.logger.debug(f"Loading sys scaling from {self.sys_file_in}") - yaml = read_yaml(self.sys_file_in) - if 'FLAG_USE_SAME_EVENTS' in yaml.keys(): - yaml.pop('FLAG_USE_SAME_EVENTS') - raw = {k: float(v.split(maxsplit=1)[0]) for _, d in yaml.items() for k, v in d.items()} - return raw - - def get_sys_scale(self): - return {**self.get_scales_from_fitopt_file(), **self.options.get("FITOPT_SCALES", {})} - - def _run(self): - sys_scale = self.get_sys_scale() - self.logger.debug(f"Final sys_scale: {sys_scale}") - - final_output_for_hash = self.get_output_string() - - new_hash = self.get_hash_from_string(final_output_for_hash) - - if self._check_regenerate(new_hash): - self.logger.debug("Regenerating and launching task") - shutil.rmtree(self.output_dir, ignore_errors=True) - mkdirs(self.output_dir) - self.save_new_hash(new_hash) - - with open(self.input_file, "w") as f: - f.write(self.get_output_string()) - - with open(self.sys_file_out, "w") as f: - f.write(yaml.safe_dump(sys_scale, width=2048)) - - with open(self.output_covmat_file, "w") as f: - f.write(yaml.safe_dump(self.input_covmat_yaml, width=2048)) - - cmd = ["submit_batch_jobs.sh", os.path.basename(self.input_file)] - self.logger.debug(f"Submitting CreateCov job: {' '.join(cmd)} in cwd: {self.output_dir}") - self.logger.debug(f"Logging to {self.logfile}") - with open(self.logfile, 'w') as f: - subprocess.run(' '.join(cmd), stdout=f, stderr=subprocess.STDOUT, cwd=self.output_dir, shell=True) - chown_dir(self.output_dir) - else: - self.should_be_done() - self.logger.info("Hash check passed, not rerunning") - return True - - - @staticmethod - def get_tasks(c, prior_tasks, base_output_dir, stage_number, prefix, global_config): - - biascor_tasks = Task.get_task_of_type(prior_tasks, BiasCor) - - def _get_createcov_dir(base_output_dir, stage_number, name): - return f"{base_output_dir}/{stage_number}_CREATE_COV/{name}" - - tasks = [] - for cname in c.get("CREATE_COV", []): - config = c["CREATE_COV"][cname] - if config is None: - config = {} - options = config.get("OPTS", {}) - mask = config.get("MASK", config.get("MASK_BIASCOR", "")) - - btasks = [btask for btask in biascor_tasks if mask in btask.name] - if len(btasks) == 0: - Task.fail_config(f"Create cov task {cname} has no biascor tasks matching mask {mask}") - - t = SBCreateCov(cname, _get_createcov_dir(base_output_dir, stage_number, cname), config, options, global_config, dependencies=btasks) - tasks.append(t) - - return tasks diff --git a/run.py b/run.py index df986561..555eb5d5 100644 --- a/run.py +++ b/run.py @@ -161,10 +161,6 @@ def run(args): logging.info(f"Running on: {os.environ.get('HOSTNAME', '$HOSTNAME not set')} login node.") - if args.debug1: - logging.warning("DEBUG1 enabled, this is still experimental. Please let Patrick know if anything goes wrong!") - config["DEBUG1"] = args.debug1 - manager = Manager(config_filename, yaml_path, config_raw, config, message_store) # Gracefully hand Ctrl-c @@ -241,7 +237,6 @@ def get_args(test=False): parser.add_argument("-p", "--permission", help="Fix permissions and groups on all output, don't rerun", action="store_true", default=False) parser.add_argument("-i", "--ignore", help="Dont rerun tasks with this stage or less. Accepts either the stage number of name (i.e. 1 or SIM)", default=None) parser.add_argument("-S", "--syntax", help="Get the syntax of the given stage. Accepts either the stage number or name (i.e. 1 or SIM). If run without argument, will tell you all stage numbers / names.", default=None, const="options", type=str, nargs='?') - parser.add_argument("--debug1", help="Enable debug option 1: Move CREATE_COV over to submit_batch", action="store_true", default=False) command_group = parser.add_mutually_exclusive_group() command_group.add_argument("-C", "--compress", help="Compress pippin output during job. Combine with -c / --check in order to compress completed pippin job.", action="store_true", default=False) command_group.add_argument("-U", "--uncompress", help="Do not compress pippin output during job. Combine with -c / --check in order to uncompress completed pippin job. Mutually exclusive with -C / --compress", action="store_true", default=False) diff --git a/tests/test_valid_config.py b/tests/test_valid_config.py index 8e926c6c..cc9e2c5b 100644 --- a/tests/test_valid_config.py +++ b/tests/test_valid_config.py @@ -198,7 +198,7 @@ def test_createcov_config_valid(): assert isinstance(tasks[-1], CreateCov) task = tasks[-1] - assert task.output["name"] == "COVTEST_BCOR" + assert task.output["name"] == "COVTEST" assert len(task.output["covopts"]) == 2 assert "ALL" in task.output["covopts"] assert "NOSYS" in task.output["covopts"] @@ -215,7 +215,7 @@ def test_cosmomc_config_valid(): assert isinstance(tasks[-1], CosmoMC) task = tasks[-1] - assert task.output["name"] == "COSMOMC_SN_OMW_COVTEST_BCOR" + assert task.output["name"] == "COSMOMC_SN_OMW_COVTEST" assert len(task.output["covopts"]) == 2 assert "ALL" in task.output["covopts"] assert "NOSYS" in task.output["covopts"]