Skip to content

Commit

Permalink
add parallelism for workflow (#222)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new `parallelism` option in the workflow submission
process, allowing users to specify the parallelism level for their
workflows. This provides greater control over the execution parallelism,
enhancing efficiency and resource management.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: zjgemi <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Han Wang <[email protected]>
  • Loading branch information
3 people authored May 28, 2024
1 parent 76c9657 commit aa806e1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
4 changes: 4 additions & 0 deletions dpgen2/entrypoint/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def submit_args(default_step_config=normalize_step_dict({})):
doc_explore = "The configuration for exploration"
doc_fp = "The configuration for FP"
doc_name = "The workflow name, 'dpgen' for default"
doc_parallelism = "The parallelism for the workflow. Accept an int that stands for the maximum number of running pods for the workflow. None for default"

return (
dflow_conf_args()
Expand Down Expand Up @@ -552,6 +553,9 @@ def submit_args(default_step_config=normalize_step_dict({})):
),
Argument("fp", dict, [], [variant_fp()], optional=False, doc=doc_fp),
Argument("name", str, optional=True, default="dpgen", doc=doc_name),
Argument(
"parallelism", int, optional=True, default=None, doc=doc_parallelism
),
]
)

Expand Down
2 changes: 1 addition & 1 deletion dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ def submit_concurrent_learning(
# wf_config["inputs"]["do_finetune"] = False
# finetune will not be done again if the old process is reused.

wf = Workflow(name=wf_config["name"])
wf = Workflow(name=wf_config["name"], parallelism=wf_config["parallelism"])

if wf_config["inputs"].get("do_finetune", False):
assert finetune_step is not None
Expand Down

0 comments on commit aa806e1

Please sign in to comment.