Skip to content

Commit

Permalink
Auto increase system threads on memory bound jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
nx10 committed Jan 5, 2024
1 parent 66d38d4 commit b0ccc3c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
58 changes: 41 additions & 17 deletions src/ecpac/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import itertools
import math
import pathlib as pl
import re
import shlex
Expand Down Expand Up @@ -134,16 +135,16 @@ def main(
res_threads = int(
cli.option_or_prompt(
opt=arg_threads,
prompt=icons.ICON_THREADS + click.style("Number of threads/cores (int)", fg="blue"),
prompt=icons.ICON_THREADS
+ click.style(" Number of threads/cores (int) (C-PAC will get 1 less)", fg="blue"),
default=str(8),
)
)

res_memory_gb = float(
cli.option_or_prompt(
opt=arg_memory_gb,
prompt=icons.ICON_MEMORY
+ click.style("Memory (GB, float) (can not be more than 2*threads on PSC)", fg="blue"),
prompt=icons.ICON_MEMORY + click.style(" Memory (GB, float) (C-PAC will get 1GB less)", fg="blue"),
default=f"{2 * res_threads:.1f}",
)
)
Expand All @@ -152,7 +153,7 @@ def main(
hours=float(
cli.option_or_prompt(
opt=arg_duration_h,
prompt=icons.ICON_DURATION + click.style("Duration (hours, float)", fg="blue"),
prompt=icons.ICON_DURATION + click.style(" Duration (hours, float)", fg="blue"),
default=f"{48.0:.1f}",
)
)
Expand All @@ -164,7 +165,7 @@ def main(
path_image = pl.Path(
cli.option_or_prompt(
opt=arg_image,
prompt=icons.ICON_SINGULARITY + click.style("Image file", fg="blue"),
prompt=icons.ICON_SINGULARITY + click.style(" Image file", fg="blue"),
default=str(consts.PSC_IMAGE_DEFAULT),
)
)
Expand All @@ -181,7 +182,7 @@ def main(
path_cpac = pl.Path(arg_cpac)
else:
cpac_opt: str = click.prompt(
icons.ICON_CPAC + click.style("C-PAC directory (empty to use image version)", fg="blue"),
icons.ICON_CPAC + click.style(" C-PAC directory (empty to use image version)", fg="blue"),
default="",
type=str,
)
Expand All @@ -203,7 +204,7 @@ def main(
path_input = pl.Path(
cli.option_or_prompt(
opt=arg_input,
prompt=icons.ICON_FOLDER + click.style("Input directory", fg="blue"),
prompt=icons.ICON_FOLDER + click.style(" Input directory", fg="blue"),
)
)

Expand All @@ -219,7 +220,7 @@ def main(
subjects = re.split(
r"\s+",
click.prompt(
icons.ICON_SUBJECT + click.style("Subjects (separate with space)", fg="blue"),
icons.ICON_SUBJECT + click.style(" Subjects (separate with space)", fg="blue"),
default=" ".join(subjects),
),
)
Expand Down Expand Up @@ -248,7 +249,7 @@ def main(
path_output = pl.Path(
cli.option_or_prompt(
opt=arg_output,
prompt=icons.ICON_FOLDER + click.style("Output directory", fg="blue"),
prompt=icons.ICON_FOLDER + click.style(" Output directory", fg="blue"),
default=str(consts.PSC_OUTPUT_DEFAULT),
)
)
Expand All @@ -259,7 +260,7 @@ def main(
r"\s+",
cli.option_or_prompt(
opt=arg_pipeline,
prompt=icons.ICON_PIPELINE + click.style("Pipelines (separate with space)", fg="blue"),
prompt=icons.ICON_PIPELINE + click.style(" Pipelines (separate with space)", fg="blue"),
default=consts.ID_PIPELINE_DEFAULT,
),
)
Expand Down Expand Up @@ -314,14 +315,14 @@ def main(
save_working_dir = cli.option_or_confirm(
opt=utils.option_truthy(arg_save_working_dir),
default=False,
prompt=icons.ICON_SAVE + click.style("Save working directory", fg="blue"),
prompt=icons.ICON_SAVE + click.style(" Save working directory", fg="blue"),
)

# Extra cpac args

extra_cpac_args = cli.option_or_prompt(
opt=arg_extra_cpac_args,
prompt=icons.ICON_EXTRA_ARGS + click.style("Extra args to pass to C-PAC?", fg="blue"),
prompt=icons.ICON_EXTRA_ARGS + click.style(" Extra args to pass to C-PAC?", fg="blue"),
default="",
)

Expand Down Expand Up @@ -392,6 +393,29 @@ def main(
f"- Output: `{path_out.absolute()}`\n"
)

# Adjust ressources for ACCESS limits
# - ACCESS fixes total memory to 2GB per thread
# - Let's also keep 1 thread + 1GB free for the system
# - Also ACCESS uses 1000MB per GB

if res_memory_gb > (2 * res_threads): # Memory bound
job_threads = max(math.ceil(res_memory_gb / 2), 2)
cpac_threads = max(res_threads - 1, 1)

# Warn user
click.secho(
f"Warning: Job is memory-bound, increased job threads to {job_threads}. "
f"(C-PAC will get {cpac_threads}.)",
fg="yellow",
)

else: # Thread bound
job_threads = res_threads
cpac_threads = max(res_threads - 1, 1)

job_memory_gb = res_memory_gb
cpac_memory_gb = max(res_memory_gb - 1, 1)

job = consts.BASH_TEMPLATE_JOB.format(
job_name=f"{run_id}_{pipe_id}_{sub}",
stdout_file=path_stdout_log,
Expand All @@ -406,11 +430,11 @@ def main(
path_input=path_input.absolute(),
path_output=path_out_full.absolute(),
image=path_image.absolute(),
threads=res_threads,
duration_str=utils.timedelta_to_hms(res_duration),
memory_mb=int(res_memory_gb * 1000),
cpac_threads=max(res_threads - 1, 1),
cpac_memory_gb=max(res_memory_gb - 1, 1),
threads=job_threads,
memory_mb=utils.bridges_gb_to_mb(job_memory_gb),
cpac_threads=cpac_threads,
cpac_memory_gb=cpac_memory_gb,
extra_cpac_args=" ".join(extra_args),
analysis_level=analysis_level,
before_run=before_run,
Expand Down Expand Up @@ -474,7 +498,7 @@ def main(
for plan in fs_plans:
plan.apply()

if click.confirm(icons.ICON_LAUNCH + click.style("Launch now?", bg="blue"), default=None):
if click.confirm(icons.ICON_LAUNCH + click.style(" Launch now?", bg="blue"), default=None):
subprocess.run([path_executor], shell=True)

click.secho("Jobs were executed!", bg="blue")
Expand Down
13 changes: 3 additions & 10 deletions src/ecpac/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,9 @@ def slack_message_bash(data: dict) -> str:
"""Generate a bash command to send a message to Slack."""
if not slack_webhook_available():
return ""
return shlex.join([
"curl",
"-X",
"POST",
"-H",
"Content-type: application/json",
"--data",
json.dumps(data),
SLACK_WEBHOOK_URL
])
return shlex.join(
["curl", "-X", "POST", "-H", "Content-type: application/json", "--data", json.dumps(data), SLACK_WEBHOOK_URL]
)


def slack_message_bash_mrkdwn(text: str) -> str:
Expand Down
5 changes: 5 additions & 0 deletions src/ecpac/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ def cpac_dir_valid(path: Union[str, os.PathLike]) -> bool:
and (p / "dev/docker_data/run.py").exists()
and (p / "dev/docker_data/run-with-freesurfer.sh").exists()
)


def bridges_gb_to_mb(gb: float | int) -> float | int:
"""ACCESS/Bridges uses 1000 MB per GB instead of 1024 MB per GB."""
return gb * 1000

0 comments on commit b0ccc3c

Please sign in to comment.