From 8e00e2d63116e58818037af259e55fedd55154e6 Mon Sep 17 00:00:00 2001 From: Patrick Armstrong Date: Mon, 14 Nov 2022 16:36:52 -0600 Subject: [PATCH] Allow tasks to know their dependents. Only compress if you have no dependents. Only generate CreateCov CosmoMC output if CosmoMC task exists --- pippin/create_cov.py | 22 +++++++++++++++------- pippin/manager.py | 20 +++++++++++++++++++- pippin/task.py | 7 +++++++ 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/pippin/create_cov.py b/pippin/create_cov.py index 8720bb76..0dcb2e34 100644 --- a/pippin/create_cov.py +++ b/pippin/create_cov.py @@ -10,6 +10,7 @@ from pippin.biascor import BiasCor from pippin.config import mkdirs, get_config, get_data_loc, read_yaml, merge_dict from pippin.task import Task +import pippin.cosmofitters.cosmomc as cosmomc class CreateCov(ConfigBasedExecutable): @@ -84,19 +85,17 @@ def __init__(self, name, output_dir, config, options, global_config, dependencie self.calibration_set = options.get("CALIBRATORS", []) self.output["hubble_plot"] = self.biascor_dep.output["hubble_plot"] - prepare_cosmomc = self.config.get("COSMOMC", False) + self.prepare_cosmomc = self.config.get("COSMOMC", False) - if prepare_cosmomc: - self.logger.info("Generating cosmomc output") - self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc") - self.prepare_cosmomc = True + if self.prepare_cosmomc: + self.logger.info("Generating CosmoMC output") else: - self.logger.info("Not generating cosmomc output") - self.prepare_cosmomc = False + self.logger.info("Not generating CosmoMC output") covopts_map = {"ALL": 0} for i, covopt in enumerate(self.options.get("COVOPTS", [])): covopts_map[covopt.split("]")[0][1:]] = i + 1 self.output["covopts"] = covopts_map + self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc") self.output["index"] = index self.output["bcor_name"] = self.biascor_dep.name self.slurm = """{sbatch_header} @@ -108,6 +107,13 @@ def __init__(self, name, output_dir, config, options, global_config, dependencie fi """ + def add_dependent(self, task): + self.dependents.append(task) + if isinstance(task, cosmomc.CosmoMC): + self.logger.info("CosmoMC task found, CreateCov will generate CosmoMC output") + self.prepare_cosmomc = True + + def get_sys_file_in(self): set_file = self.options.get("SYS_SCALE") if set_file is not None: @@ -165,6 +171,8 @@ def calculate_input(self): self.yaml["EXTRA_COVS"] = self.options.get("EXTRA_COVS", []) self.yaml["CALIBRATORS"] = self.calibration_set + self.logger.debug(f"YAML: {self.yaml}") + # Load in sys file, add muopt arguments if needed # Get the MUOPT_SCALES and FITOPT scales keywords sys_scale = {**self.get_scales_from_fitopt_file(), **self.options.get("FITOPT_SCALES", {})} diff --git a/pippin/manager.py b/pippin/manager.py index ad0e140a..a1e9fb10 100644 --- a/pippin/manager.py +++ b/pippin/manager.py @@ -148,6 +148,8 @@ def get_tasks(self, config): self.logger.notice("Listing tasks:") for task in total_tasks: self.logger.notice(f"\t{task}") + self.logger.debug(f"Task {task.name} has dependencies: {task.dependencies}") + self.logger.debug(f"Task {task.name} has dependents: {task.dependents}") self.logger.info("") return total_tasks @@ -409,6 +411,21 @@ def check_task_completion(self, t, squeue): result = t.check_completion(squeue) # If its finished, good or bad, juggle tasks if result in [Task.FINISHED_SUCCESS, Task.FINISHED_FAILURE]: + self.logger.debug(f"Task {t.name} has dependencies: {t.dependencies}") + self.logger.debug(f"Task {t.name} has dependents: {t.dependents}") + if len(t.dependencies) > 0: + for task in t.dependencies: + self.logger.debug(f"Modifying dependency task {task.name}") + task.dependents.remove(t) + t.dependencies.remove(task) + self.logger.debug(f"Task {task.name} has dependencies: {task.dependencies}") + self.logger.debug(f"Task {task.name} has dependents: {task.dependents}") + if len(task.dependents) == 0: + if self.compress: + task.compress() + self.logger.debug(f"Task {t.name} has dependencies: {t.dependencies}") + self.logger.debug(f"Task {t.name} has dependents: {t.dependents}") + if t.gpu: self.num_jobs_queue_gpu -= t.num_jobs else: @@ -418,7 +435,8 @@ def check_task_completion(self, t, squeue): self.logger.notice(f"FINISHED: {t} with {t.num_jobs} NUM_JOBS. NUM_JOBS now {self.num_jobs_queue}") self.done.append(t) if self.compress: - t.compress() + if len(t.dependents) == 0: + t.compress() else: self.fail_task(t) if os.path.exists(t.output_dir): diff --git a/pippin/task.py b/pippin/task.py index a2fd8f45..2bc64489 100644 --- a/pippin/task.py +++ b/pippin/task.py @@ -22,6 +22,9 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file=" if dependencies is None: dependencies = [] self.dependencies = dependencies + self.dependents = [] + for task in dependencies: + task.add_dependent(self) if config is None: config = {} @@ -111,6 +114,8 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file=" self.output.update({"name": name, "output_dir": output_dir, "hash_file": self.hash_file, "done_file": self.done_file}) self.config_file = os.path.join(output_dir, "config.yml") + def add_dependent(self, task): + self.dependents.append(task) def set_force_refresh(self, force_refresh): self.force_refresh = force_refresh @@ -168,6 +173,7 @@ def uncompress(self): if os.path.exists(source_file): uncompress_dir(os.path.dirname(t.output_dir), source_file) + def _check_regenerate(self, new_hash): hash_are_different = new_hash != self.get_old_hash() @@ -409,6 +415,7 @@ def check_completion(self, squeue): self.logger.error(f"This means it probably crashed, have a look in {self.output_dir}") self.logger.error(f"Removing hash from {self.hash_file}") self.clear_hash() + #TODO try rerunning task return Task.FINISHED_FAILURE if self.external is None and result == Task.FINISHED_SUCCESS and not os.path.exists(self.config_file): self.write_config()