Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add convenient selection of data parallelism #335

Merged
merged 3 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/braket/aws/aws_quantum_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def create(
hyperparameters: Dict[str, Any] = None,
input_data: Union[str, Dict, S3DataSourceConfig] = None,
instance_config: InstanceConfig = None,
distribution: str = None,
stopping_condition: StoppingCondition = None,
output_data_config: OutputDataConfig = None,
copy_checkpoints_from_job: str = None,
Expand Down Expand Up @@ -134,6 +135,10 @@ def create(
to execute the job. Default: InstanceConfig(instanceType='ml.m5.large',
instanceCount=1, volumeSizeInGB=30).

distribution (str): A str that specifies how the job should be distributed. If set to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a note that it's intended for use with >1 instance count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Data parallel distribution could also be used with a single multi-gpu instance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't realize that. In that case, is there any potential a user may want to create a local job with data parallel distribution if their local hardware supports it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can cut local mode out of scope. According to this link, SageMaker local mode does not support distributed training with local GPU

"dataparallel", the hyperparameters for the job will be set to use data parallelism
features for PyTorch or TensorFlow. Default: None.

stopping_condition (StoppingCondition): The maximum length of time, in seconds,
and the maximum number of tasks that a job can run before being forcefully stopped.
Default: StoppingCondition(maxRuntimeInSeconds=5 * 24 * 60 * 60).
Expand Down Expand Up @@ -181,6 +186,7 @@ def create(
hyperparameters=hyperparameters,
input_data=input_data,
instance_config=instance_config,
distribution=distribution,
stopping_condition=stopping_condition,
output_data_config=output_data_config,
copy_checkpoints_from_job=copy_checkpoints_from_job,
Expand Down
11 changes: 11 additions & 0 deletions src/braket/jobs/quantum_job_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def prepare_quantum_job(
hyperparameters: Dict[str, Any] = None,
input_data: Union[str, Dict, S3DataSourceConfig] = None,
instance_config: InstanceConfig = None,
distribution: str = None,
stopping_condition: StoppingCondition = None,
output_data_config: OutputDataConfig = None,
copy_checkpoints_from_job: str = None,
Expand Down Expand Up @@ -98,6 +99,10 @@ def prepare_quantum_job(
to execute the job. Default: InstanceConfig(instanceType='ml.m5.large',
instanceCount=1, volumeSizeInGB=30, volumeKmsKey=None).

distribution (str): A str that specifies how the job should be distributed. If set to
"dataparallel", the hyperparameters for the job will be set to use data parallelism
features for PyTorch or TensorFlow. Default: None.

stopping_condition (StoppingCondition): The maximum length of time, in seconds,
and the maximum number of tasks that a job can run before being forcefully stopped.
Default: StoppingCondition(maxRuntimeInSeconds=5 * 24 * 60 * 60).
Expand Down Expand Up @@ -191,6 +196,12 @@ def prepare_quantum_job(
"s3Uri"
]
aws_session.copy_s3_directory(checkpoints_to_copy, checkpoint_config.s3Uri)
if distribution == "dataparallel":
distributed_hyperparams = {
"sagemaker_distributed_dataparallel_enabled": "true",
"sagemaker_instance_type": instance_config.instanceType,
}
hyperparameters.update(distributed_hyperparams)

create_job_kwargs = {
"jobName": job_name,
Expand Down
1 change: 1 addition & 0 deletions test/unit_tests/braket/aws/test_aws_quantum_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ def prepare_job_args(aws_session, device_arn):
"hyperparameters": Mock(),
"input_data": Mock(),
"instance_config": Mock(),
"distribution": Mock(),
"stopping_condition": Mock(),
"output_data_config": Mock(),
"copy_checkpoints_from_job": Mock(),
Expand Down