Skip to content

Commit

Permalink
Allow tasks to know their dependents. Only compress if you have no de…
Browse files Browse the repository at this point in the history
…pendents. Only generate CreateCov CosmoMC output if CosmoMC task exists
  • Loading branch information
OmegaLambda1998 committed Nov 14, 2022
1 parent cc10292 commit 8e00e2d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
22 changes: 15 additions & 7 deletions pippin/create_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pippin.biascor import BiasCor
from pippin.config import mkdirs, get_config, get_data_loc, read_yaml, merge_dict
from pippin.task import Task
import pippin.cosmofitters.cosmomc as cosmomc


class CreateCov(ConfigBasedExecutable):
Expand Down Expand Up @@ -84,19 +85,17 @@ def __init__(self, name, output_dir, config, options, global_config, dependencie
self.calibration_set = options.get("CALIBRATORS", [])
self.output["hubble_plot"] = self.biascor_dep.output["hubble_plot"]

prepare_cosmomc = self.config.get("COSMOMC", False)
self.prepare_cosmomc = self.config.get("COSMOMC", False)

if prepare_cosmomc:
self.logger.info("Generating cosmomc output")
self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc")
self.prepare_cosmomc = True
if self.prepare_cosmomc:
self.logger.info("Generating CosmoMC output")
else:
self.logger.info("Not generating cosmomc output")
self.prepare_cosmomc = False
self.logger.info("Not generating CosmoMC output")
covopts_map = {"ALL": 0}
for i, covopt in enumerate(self.options.get("COVOPTS", [])):
covopts_map[covopt.split("]")[0][1:]] = i + 1
self.output["covopts"] = covopts_map
self.output["ini_dir"] = os.path.join(self.config_dir, "cosmomc")
self.output["index"] = index
self.output["bcor_name"] = self.biascor_dep.name
self.slurm = """{sbatch_header}
Expand All @@ -108,6 +107,13 @@ def __init__(self, name, output_dir, config, options, global_config, dependencie
fi
"""

def add_dependent(self, task):
self.dependents.append(task)
if isinstance(task, cosmomc.CosmoMC):
self.logger.info("CosmoMC task found, CreateCov will generate CosmoMC output")
self.prepare_cosmomc = True


def get_sys_file_in(self):
set_file = self.options.get("SYS_SCALE")
if set_file is not None:
Expand Down Expand Up @@ -165,6 +171,8 @@ def calculate_input(self):
self.yaml["EXTRA_COVS"] = self.options.get("EXTRA_COVS", [])
self.yaml["CALIBRATORS"] = self.calibration_set

self.logger.debug(f"YAML: {self.yaml}")

# Load in sys file, add muopt arguments if needed
# Get the MUOPT_SCALES and FITOPT scales keywords
sys_scale = {**self.get_scales_from_fitopt_file(), **self.options.get("FITOPT_SCALES", {})}
Expand Down
20 changes: 19 additions & 1 deletion pippin/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def get_tasks(self, config):
self.logger.notice("Listing tasks:")
for task in total_tasks:
self.logger.notice(f"\t{task}")
self.logger.debug(f"Task {task.name} has dependencies: {task.dependencies}")
self.logger.debug(f"Task {task.name} has dependents: {task.dependents}")
self.logger.info("")
return total_tasks

Expand Down Expand Up @@ -409,6 +411,21 @@ def check_task_completion(self, t, squeue):
result = t.check_completion(squeue)
# If its finished, good or bad, juggle tasks
if result in [Task.FINISHED_SUCCESS, Task.FINISHED_FAILURE]:
self.logger.debug(f"Task {t.name} has dependencies: {t.dependencies}")
self.logger.debug(f"Task {t.name} has dependents: {t.dependents}")
if len(t.dependencies) > 0:
for task in t.dependencies:
self.logger.debug(f"Modifying dependency task {task.name}")
task.dependents.remove(t)
t.dependencies.remove(task)
self.logger.debug(f"Task {task.name} has dependencies: {task.dependencies}")
self.logger.debug(f"Task {task.name} has dependents: {task.dependents}")
if len(task.dependents) == 0:
if self.compress:
task.compress()
self.logger.debug(f"Task {t.name} has dependencies: {t.dependencies}")
self.logger.debug(f"Task {t.name} has dependents: {t.dependents}")

if t.gpu:
self.num_jobs_queue_gpu -= t.num_jobs
else:
Expand All @@ -418,7 +435,8 @@ def check_task_completion(self, t, squeue):
self.logger.notice(f"FINISHED: {t} with {t.num_jobs} NUM_JOBS. NUM_JOBS now {self.num_jobs_queue}")
self.done.append(t)
if self.compress:
t.compress()
if len(t.dependents) == 0:
t.compress()
else:
self.fail_task(t)
if os.path.exists(t.output_dir):
Expand Down
7 changes: 7 additions & 0 deletions pippin/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file="
if dependencies is None:
dependencies = []
self.dependencies = dependencies
self.dependents = []
for task in dependencies:
task.add_dependent(self)

if config is None:
config = {}
Expand Down Expand Up @@ -111,6 +114,8 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file="
self.output.update({"name": name, "output_dir": output_dir, "hash_file": self.hash_file, "done_file": self.done_file})
self.config_file = os.path.join(output_dir, "config.yml")

def add_dependent(self, task):
self.dependents.append(task)

def set_force_refresh(self, force_refresh):
self.force_refresh = force_refresh
Expand Down Expand Up @@ -168,6 +173,7 @@ def uncompress(self):
if os.path.exists(source_file):
uncompress_dir(os.path.dirname(t.output_dir), source_file)


def _check_regenerate(self, new_hash):
hash_are_different = new_hash != self.get_old_hash()

Expand Down Expand Up @@ -409,6 +415,7 @@ def check_completion(self, squeue):
self.logger.error(f"This means it probably crashed, have a look in {self.output_dir}")
self.logger.error(f"Removing hash from {self.hash_file}")
self.clear_hash()
#TODO try rerunning task
return Task.FINISHED_FAILURE
if self.external is None and result == Task.FINISHED_SUCCESS and not os.path.exists(self.config_file):
self.write_config()
Expand Down

0 comments on commit 8e00e2d

Please sign in to comment.