Skip to content

Commit

Permalink
feat: Added AutoMLTablesTrainingJob (#62)
Browse files Browse the repository at this point in the history
Added the training job subclass for tables

Co-authored-by: Ivan Cheung <[email protected]>
  • Loading branch information
ivanmkc and Ivan Cheung authored Nov 24, 2020
1 parent f2ccd1e commit aa5e15d
Show file tree
Hide file tree
Showing 4 changed files with 608 additions and 23 deletions.
6 changes: 5 additions & 1 deletion google/cloud/aiplatform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
from google.cloud.aiplatform.datasets import Dataset
from google.cloud.aiplatform.models import Endpoint
from google.cloud.aiplatform.models import Model
from google.cloud.aiplatform.training_jobs import CustomTrainingJob
from google.cloud.aiplatform.jobs import BatchPredictionJob
from google.cloud.aiplatform.training_jobs import (
CustomTrainingJob,
AutoMLTablesTrainingJob,
)

"""
Usage:
Expand All @@ -36,6 +39,7 @@
"gapic",
"BatchPredictionJob",
"CustomTrainingJob",
"AutoMLTablesTrainingJob",
"Model",
"Dataset",
"Endpoint",
Expand Down
1 change: 1 addition & 0 deletions google/cloud/aiplatform/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class training_job:
class definition:
custom_task = "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml"
tabular_task = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_tabular_1.0.0.yaml"


class dataset:
Expand Down
241 changes: 219 additions & 22 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
training_pipeline as gca_training_pipeline,
)


from google.cloud import storage
from google.protobuf import json_format
from google.protobuf import struct_pb2
Expand Down Expand Up @@ -180,7 +179,7 @@ def _run_job(
than the one given on input. The output URI will
point to a location where the user only has a
read access.
training_task_inputs (~.struct.Value):
training_task_inputs (dict):
Required. The training task's parameter(s), as specified in
the
``training_task_definition``'s
Expand Down Expand Up @@ -238,7 +237,9 @@ def _run_job(
training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=self._display_name,
training_task_definition=training_task_definition,
training_task_inputs=training_task_inputs,
training_task_inputs=json_format.ParseDict(
training_task_inputs, struct_pb2.Value()
),
model_to_upload=model,
input_data_config=input_data_config,
)
Expand Down Expand Up @@ -890,14 +891,6 @@ def __init__(
Required: Uri of the training container image in the GCR.
requirements (Sequence[str]):
List of python packages dependencies of script.
project (str):
Optional project to retrieve model from. If not set, project set in
aiplatform.init will be used.
location (str):
Optional location to retrieve model from. If not set, location set in
aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional credentials to use to retrieve the model.
model_serving_container_image_uri (str):
If the training produces a managed AI Platform Model, the URI of the
Model serving container suitable for serving the model produced by the
Expand Down Expand Up @@ -1055,13 +1048,13 @@ def run(
The number of accelerators to attach to a worker replica.
training_fraction_split (float):
The fraction of the input data that is to be
used to train the Model.
used to train the Model. This is ignored if Dataset is not provided.
validation_fraction_split (float):
The fraction of the input data that is to be
used to validate the Model.
used to validate the Model. This is ignored if Dataset is not provided.
test_fraction_split (float):
The fraction of the input data that is to be
used to evaluate the Model.
used to evaluate the Model. This is ignored if Dataset is not provided.
Returns:
model: The trained AI Platform Model resource or None if training did not
Expand Down Expand Up @@ -1129,13 +1122,10 @@ def run(
if args:
spec["pythonPackageSpec"]["args"] = args

training_task_inputs = json_format.ParseDict(
{
"workerPoolSpecs": worker_pool_specs,
"baseOutputDirectory": {"output_uri_prefix": base_output_dir},
},
struct_pb2.Value(),
)
training_task_inputs = {
"workerPoolSpecs": worker_pool_specs,
"baseOutputDirectory": {"output_uri_prefix": base_output_dir},
}

training_task_definition = schema.training_job.definition.custom_task

Expand Down Expand Up @@ -1185,4 +1175,211 @@ def _model_upload_fail_string(self) -> str:


class AutoMLTablesTrainingJob(_TrainingJob):
pass
def __init__(
self,
display_name: str,
optimization_prediction_type: str,
optimization_objective: Optional[str] = None,
column_transformations: Optional[Union[Dict, List[Dict]]] = None,
optimization_objective_recall_value: Optional[float] = None,
optimization_objective_precision_value: Optional[float] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
"""Constructs a AutoML Tables Training Job.
Args:
display_name (str):
Required. The user-defined name of this TrainingPipeline.
optimization_prediction_type (str):
The type of prediction the Model is to produce.
"classification" - Predict one out of multiple target values is
picked for each row.
"regression" - Predict a value based on its relation to other values.
This type is available only to columns that contain
semantically numeric values, i.e. integers or floating
point number, even if stored as e.g. strings.
optimization_objective (str):
Optional. Objective function the Model is to be optimized towards. The training
task creates a Model that maximizes/minimizes the value of the objective
function over the validation set.
The supported optimization objectives depend on the prediction type, and
in the case of classification also the number of distinct values in the
target column (two distint values -> binary, 3 or more distinct values
-> multi class).
If the field is not set, the default objective function is used.
Classification (binary):
"maximize-au-roc" (default) - Maximize the area under the receiver
operating characteristic (ROC) curve.
"minimize-log-loss" - Minimize log loss.
"maximize-au-prc" - Maximize the area under the precision-recall curve.
"maximize-precision-at-recall" - Maximize precision for a specified
recall value.
"maximize-recall-at-precision" - Maximize recall for a specified
precision value.
Classification (multi class):
"minimize-log-loss" (default) - Minimize log loss.
Regression:
"minimize-rmse" (default) - Minimize root-mean-squared error (RMSE).
"minimize-mae" - Minimize mean-absolute error (MAE).
"minimize-rmsle" - Minimize root-mean-squared log error (RMSLE).
column_transformations (Optional[Union[Dict, List[Dict]]]):
Optional. Transformations to apply to the input columns (i.e. columns other
than the targetColumn). Each transformation may produce multiple
result values from the column's value, and all are used for training.
When creating transformation for BigQuery Struct column, the column
should be flattened using "." as the delimiter.
If an input column has no transformations on it, such a column is
ignored by the training, except for the targetColumn, which should have
no transformations defined on.
optimization_objective_recall_value (float):
Optional. Required when maximize-precision-at-recall optimizationObjective was
picked, represents the recall value at which the optimization is done.
The minimum value is 0 and the maximum is 1.0.
optimization_objective_precision_value (float):
Optional. Required when maximize-recall-at-precision optimizationObjective was
picked, represents the precision value at which the optimization is
done.
The minimum value is 0 and the maximum is 1.0.
project (str):
Optional. Project to run training in. Overrides project set in aiplatform.init.
location (str):
Optional. Location to run training in. Overrides location set in aiplatform.init.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to run call training service. Overrides
credentials set in aiplatform.init.
"""
super().__init__(
display_name=display_name,
project=project,
location=location,
credentials=credentials,
)
self._column_transformations = column_transformations
self._optimization_objective = optimization_objective
self._optimization_prediction_type = optimization_prediction_type
self._optimization_objective_recall_value = optimization_objective_recall_value
self._optimization_objective_precision_value = (
optimization_objective_precision_value
)

def run(
self,
model_display_name: str,
dataset: datasets.Dataset,
target_column: str,
training_fraction_split: float = 0.8,
validation_fraction_split: float = 0.1,
test_fraction_split: float = 0.1,
weight_column: Optional[str] = None,
budget_milli_node_hours: int = 1000,
disable_early_stopping=False,
) -> models.Model:
"""Runs the training job and returns a model.
Data fraction splits:
Any of ``training_fraction_split``, ``validation_fraction_split`` and
``test_fraction_split`` may optionally be provided, they must sum to up to 1. If
the provided ones sum to less than 1, the remainder is assigned to sets as
decided by AI Platform.If none of the fractions are set, by default roughly 80%
of data will be used for training, 10% for validation, and 10% for test.
Args:
model_display_name (str):
Required. If the script produces a managed AI Platform Model. The display name of
the Model. The name can be up to 128 characters long and can be consist
of any UTF-8 characters.
dataset (aiplatform.Dataset):
Required. The dataset within the same Project from which data will be used to train the Model. The
Dataset must use schema compatible with Model being trained,
and what is compatible should be described in the used
TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition].
For tabular Datasets, all their data is exported to
training, to pick and choose from.
training_fraction_split (float):
Required. The fraction of the input data that is to be
used to train the Model. This is ignored if Dataset is not provided.
validation_fraction_split (float):
Required. The fraction of the input data that is to be
used to validate the Model. This is ignored if Dataset is not provided.
test_fraction_split (float):
Required. The fraction of the input data that is to be
used to evaluate the Model. This is ignored if Dataset is not provided.
weight_column (str):
Optional. Name of the column that should be used as the weight column.
Higher values in this column give more importance to the row
during Model training. The column must have numeric values between 0 and
10000 inclusively, and 0 value means that the row is ignored.
If the weight column field is not set, then all rows are assumed to have
equal weight of 1.
budget_milli_node_hours (int):
Optional. The train budget of creating this Model, expressed in milli node
hours i.e. 1,000 value in this field means 1 node hour.
The training cost of the model will not exceed this budget. The final
cost will be attempted to be close to the budget, though may end up
being (even) noticeably smaller - at the backend's discretion. This
especially may happen when further model training ceases to provide
any improvements.
If the budget is set to a value known to be insufficient to train a
Model for the given training set, the training won't be attempted and
will error.
The minimum value is 1000 and the maximum is 72000.
disable_early_stopping (bool):
Required. If true, the entire budget is used. This disables the early stopping
feature. By default, the early stopping feature is enabled, which means
that training might stop before the entire training budget has been
used, if futrher training does no longer brings significant improvement
to the model.
Returns:
model: The trained AI Platform Model resource or None if training did not
produce an AI Platform Model.
Raises:
RuntimeError if Training job has already been run
"""

training_task_definition = schema.training_job.definition.tabular_task

training_task_inputs_dict = {
# required inputs
"targetColumn": target_column,
"transformations": self._column_transformations,
"trainBudgetMilliNodeHours": budget_milli_node_hours,
# optional inputs
"weightColumnName": weight_column,
"disableEarlyStopping": disable_early_stopping,
"optimizationObjective": self._optimization_objective,
"predictionType": self._optimization_prediction_type,
"optimizationObjectiveRecallValue": self._optimization_objective_recall_value,
"optimizationObjectivePrecisionValue": self._optimization_objective_precision_value,
}

model = gca_model.Model(display_name=model_display_name)

return self._run_job(
training_task_definition=training_task_definition,
training_task_inputs=training_task_inputs_dict,
dataset=dataset,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
model=model,
)

@property
def _model_upload_fail_string(self) -> str:
"""Helper property for model upload failure."""
return (
f"Training Pipeline {self.resource_name} is not configured to upload a "
"Model."
)
Loading

0 comments on commit aa5e15d

Please sign in to comment.