Skip to content

Commit

Permalink
Support all NeptuneML API command parameters in neptune_ml magics, ac…
Browse files Browse the repository at this point in the history
…cept unified JSON blob for parameter input (#202)

* Add all NeptuneML command parameters as args for %neptune_ml

* Update Changelog

* Support single JSON object as parameter intake for all steps

* Update Changelog for JSON blob

Co-authored-by: Michael Chin <[email protected]>
  • Loading branch information
michaelnchin and michaelnchin authored Oct 1, 2021
1 parent 5e6a806 commit 44bc851
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 69 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Starting with v1.31.6, this file will contain a record of major features and updates made in each release of graph-notebook.

## Upcoming
- Added full support for NeptuneML API command parameters to `%neptune_ml` ([Link to PR](https://github.com/aws/graph-notebook/pull/202))
- Allow `%%neptune_ml` to accept JSON blob as parameter input for most phases ([Link to PR](https://github.com/aws/graph-notebook/pull/202))
- Added `--silent` option for suppressing query output ([PR #1](https://github.com/aws/graph-notebook/pull/201)) ([PR #2](https://github.com/aws/graph-notebook/pull/203))

## Release 3.0.6 (September 20, 2021)
Expand Down
446 changes: 407 additions & 39 deletions src/graph_notebook/magics/ml.py

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions src/graph_notebook/neptune/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,17 @@ def dataprocessing_stop(self, job_id: str, clean=False, neptune_iam_role_arn: st
return res

def modeltraining_start(self, data_processing_job_id: str, train_model_s3_location: str,
max_hpo_number_of_training_jobs: int, max_hpo_parallel_training_jobs: int,
**kwargs) -> requests.Response:
"""
for a full list of supported parameters, see:
https://docs.aws.amazon.com/neptune/latest/userguide/machine-learning-api-modeltraining.html
"""
data = {
'dataProcessingJobId': data_processing_job_id,
'trainModelS3Location': train_model_s3_location
'trainModelS3Location': train_model_s3_location,
'maxHPONumberOfTrainingJobs': max_hpo_number_of_training_jobs,
'maxHPOParallelTrainingJobs': max_hpo_parallel_training_jobs
}

for k, v in kwargs.items():
Expand Down Expand Up @@ -444,8 +447,9 @@ def modeltraining_stop(self, training_job_id: str, neptune_iam_role_arn: str = '
res = self._http_session.send(req)
return res

def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id: str = '', modeltraining_job_id: str = '',
training_job_name: str = '', **kwargs) -> requests.Response:
def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id: str = '',
modeltraining_job_id: str = '', training_job_name: str = '',
**kwargs) -> requests.Response:
logger.debug("modeltransform_create initiated with params:"
f"output_s3_location: {output_s3_location}\n"
f"dataprocessing_job_id: {dataprocessing_job_id}\n"
Expand All @@ -462,7 +466,8 @@ def modeltransform_create(self, output_s3_location: str, dataprocessing_job_id:
data['mlModelTrainingJobId'] = modeltraining_job_id
else:
raise ValueError(
'Invalid input. Must only specify either dataprocessing_job_id and modeltraining_job_id or only training_job_name')
'Invalid input. Must only specify either dataprocessing_job_id and modeltraining_job_id or only '
'training_job_name')

for k, v in kwargs.items():
data[k] = v
Expand Down Expand Up @@ -511,10 +516,17 @@ def modeltransform_stop(self, job_id: str, iam_role: str = '', clean: bool = Fal
res = self._http_session.send(req)
return res

def endpoints_create(self, training_job_id: str, **kwargs) -> requests.Response:
data = {
'mlModelTrainingJobId': training_job_id
}
def endpoints_create(self, model_training_job_id: str = '', model_transform_job_id: str = '',
**kwargs) -> requests.Response:
data = {}

if model_training_job_id and not model_transform_job_id:
data['mlModelTrainingJobId'] = model_training_job_id
elif model_transform_job_id and not model_training_job_id:
data['mlModelTransformJobId'] = model_transform_job_id
else:
raise ValueError('Invalid input. Must either specify model_training_job_id or model_transform_job_id, '
'and not both.')

for k, v in kwargs.items():
data[k] = v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@
"--job-id {training_job_name}\n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
],
"outputs": [],
"metadata": {}
Expand Down Expand Up @@ -479,7 +481,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name} \"\"\""
"--model--training-job-id {training_job_name} \"\"\""
],
"outputs": [],
"metadata": {}
Expand Down Expand Up @@ -751,7 +753,7 @@
"cell_type": "code",
"execution_count": null,
"source": [
"neptune_ml.delete_endpoint(endpoint)"
"neptune_ml.delete_endpoint(training_job_name)"
],
"outputs": [],
"metadata": {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
Expand Down Expand Up @@ -475,7 +477,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
Expand Down Expand Up @@ -449,7 +451,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
Expand Down Expand Up @@ -478,7 +480,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Expand Down Expand Up @@ -636,7 +638,7 @@
"metadata": {},
"outputs": [],
"source": [
"neptune_ml.delete_endpoint(endpoint)"
"neptune_ml.delete_endpoint(training_job_name)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@
"--job-id {training_job_name} \n",
"--data-processing-id {training_job_name} \n",
"--instance-type ml.p3.2xlarge\n",
"--s3-output-uri {str(s3_bucket_uri)}/training \"\"\""
"--s3-output-uri {str(s3_bucket_uri)}/training\n",
"--max-hpo-number 2\n",
"--max-hpo-parallel 2 \"\"\""
]
},
{
Expand Down Expand Up @@ -476,7 +478,7 @@
"source": [
"endpoint_params=f\"\"\"\n",
"--job-id {training_job_name} \n",
"--model-job-id {training_job_name}\"\"\""
"--model-training-job-id {training_job_name}\"\"\""
]
},
{
Expand Down
29 changes: 19 additions & 10 deletions test/integration/iam/ml/test_neptune_ml_with_iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
NEPTUNE_ML_IAM_ROLE_ARN = os.getenv('NEPTUNE_ML_IAM_ROLE_ARN')
NEPTUNE_ML_COMPLETED_TRAINING_ID = os.getenv('NEPTUNE_ML_COMPLETED_TRAINING_ID')
NEPTUNE_ML_TRANSFORM_OUTPUT = 's3://akline-misc/transform'
NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS = 2
NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS = 2


class TestNeptuneMLWithIAM(GraphNotebookIntegrationTest):
Expand Down Expand Up @@ -89,7 +91,9 @@ def test_neptune_ml_dataprocessing(self):

def test_neptune_ml_modeltraining(self):
training_res = self.client.modeltraining_start(NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
NEPTUNE_ML_TRAINING_OUTPUT)
NEPTUNE_ML_TRAINING_OUTPUT,
NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS,
NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS)
assert training_res.status_code == 200
training = training_res.json()

Expand All @@ -109,9 +113,9 @@ def test_neptune_ml_modeltraining(self):
assert delete_res.status_code == 200

def test_neptune_ml_modeltransform(self):
create_res = self.client.modeltransform_create(NEPTUNE_ML_TRANSFORM_OUTPUT,
NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
NEPTUNE_ML_COMPLETED_TRAINING_ID)
create_res = self.client.modeltransform_create(output_s3_location=NEPTUNE_ML_TRANSFORM_OUTPUT,
dataprocessing_job_id=NEPTUNE_ML_COMPLETED_DATAPROCESSING_JOB_ID,
modeltraining_job_id=NEPTUNE_ML_COMPLETED_TRAINING_ID)
assert create_res.status_code == 200

create = create_res.json()
Expand All @@ -138,6 +142,8 @@ def test_neptune_ml_e2e(self):
s3_input_uri = os.getenv('NEPTUNE_ML_DATAPROCESSING_S3_INPUT', '')
s3_processed_uri = os.getenv('NEPTUNE_ML_DATAPROCESSING_S3_PROCESSED', '')
train_model_s3_location = os.getenv('NEPTUNE_ML_TRAINING_S3_LOCATION', '')
hpo_number = NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS
hpo_parallel = NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS

assert s3_input_uri != ''
assert s3_processed_uri != ''
Expand All @@ -152,7 +158,7 @@ def test_neptune_ml_e2e(self):
p.join(3600)

logger.info("model training...")
training_job = do_modeltraining(dataprocessing_id, train_model_s3_location)
training_job = do_modeltraining(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
training_job_id = training_job['id']

p = threading.Thread(target=wait_for_modeltraining_complete, args=(training_job_id,))
Expand Down Expand Up @@ -186,14 +192,16 @@ def test_neptune_ml_modeltraining_status(self):
def test_neptune_ml_training(self):
dataprocessing_id = os.getenv('NEPTUNE_ML_DATAPROCESSING_ID', '')
train_model_s3_location = os.getenv('NEPTUNE_ML_TRAINING_S3_LOCATION', '')
hpo_number = NEPTUNE_ML_MAX_TOTAL_HPO_TRAINING_JOBS
hpo_parallel = NEPTUNE_ML_MAX_PARALLEL_HPO_TRAINING_JOBS

assert dataprocessing_id != ''
assert train_model_s3_location != ''

dataprocessing_status = client.dataprocessing_job_status(dataprocessing_id)
assert dataprocessing_status.status_code == 200

job_start_res = client.modeltraining_start(dataprocessing_id, train_model_s3_location)
job_start_res = client.modeltraining_start(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
assert job_start_res.status_code == 200

job_id = job_start_res.json()['id']
Expand Down Expand Up @@ -238,10 +246,11 @@ def wait_for_dataprocessing_complete(dataprocessing_id: str):
time.sleep(10)


def do_modeltraining(dataprocessing_id, train_model_s3_location):
def do_modeltraining(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel):
logger.info(
f"starting training job from dataprocessing_job_id={dataprocessing_id} and training_model_s3_location={train_model_s3_location}")
training_start = client.modeltraining_start(dataprocessing_id, train_model_s3_location)
f"starting training job from dataprocessing_job_id={dataprocessing_id} "
f"and training_model_s3_location={train_model_s3_location}")
training_start = client.modeltraining_start(dataprocessing_id, train_model_s3_location, hpo_number, hpo_parallel)
assert training_start.status_code == 200
return training_start.json()

Expand All @@ -261,7 +270,7 @@ def wait_for_modeltraining_complete(training_job: str) -> dict:


def do_create_endpoint(training_job_id: str) -> dict:
endpoint_res = client.endpoints_create(training_job_id)
endpoint_res = client.endpoints_create(model_training_job_id=training_job_id)
assert endpoint_res.status_code == 200
return endpoint_res.json()

Expand Down

0 comments on commit 44bc851

Please sign in to comment.