Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Enable model monitoring #11

Merged
1 commit merged into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,16 @@ def tensorflow_pipeline(
).set_display_name("Ingest data")

# lookup champion model
champion_model = lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
).set_display_name("Look up champion model")
champion_model = (
lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
)
.set_display_name("Look up champion model")
.set_caching_options(False)
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ def _get_temp_dir(dirpath, task_id):
parser.add_argument("--hparams", default={}, type=json.loads)
args = parser.parse_args()

if args.model.startswith("gs://"):
args.model = Path("/gcs/" + args.model[5:])

# merge dictionaries by overwriting default_model_params if provided in model_params
hparams = {**DEFAULT_HPARAMS, **args.hparams}
logging.info(f"Using model hyper-parameters: {hparams}")
Expand Down Expand Up @@ -261,9 +264,9 @@ def _get_temp_dir(dirpath, task_id):
logging.info("not chief node, exiting now")
sys.exit()

os.makedirs(args.model, exist_ok=True)
logging.info(f"Save model to: {args.model}")
tf_model.save(args.model, save_format="tf")
args.model.mkdir(parents=True)
tf_model.save(str(args.model), save_format="tf")

logging.info(f"Save metrics to: {args.metrics}")
eval_metrics = dict(zip(tf_model.metrics_names, tf_model.evaluate(test_ds)))
Expand All @@ -281,11 +284,13 @@ def _get_temp_dir(dirpath, task_id):
json.dump(metrics, fp)

# Persist URIs of training file(s) for model monitoring in batch predictions
path = Path(args.model) / TRAINING_DATASET_INFO
# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501
# for the expected schema.
path = args.model / TRAINING_DATASET_INFO
training_dataset_for_monitoring = {
"gcsSource": {"uris": [args.train_data]},
"dataFormat": "csv",
"targetField": hparams["label"],
"targetField": label,
}
logging.info(f"Save training dataset info for model monitoring: {path}")
logging.info(f"Training dataset: {training_dataset_for_monitoring}")
Expand Down
1 change: 1 addition & 0 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def tensorflow_pipeline(
fail_on_model_not_found=False,
)
.set_display_name("Lookup past model")
.set_caching_options(False)
.outputs["model_resource_name"]
)

Expand Down
16 changes: 10 additions & 6 deletions pipelines/src/pipelines/xgboost/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ def xgboost_pipeline(
).set_display_name("Ingest data")

# lookup champion model
champion_model = lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
).set_display_name("Look up champion model")
champion_model = (
lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
)
.set_display_name("Look up champion model")
.set_caching_options(False)
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
Expand Down
35 changes: 26 additions & 9 deletions pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
from pathlib import Path

import joblib
import json
import os
Expand All @@ -14,7 +16,9 @@

logging.basicConfig(level=logging.DEBUG)


# used for monitoring during prediction time
TRAINING_DATASET_INFO = "training_dataset.json"
# numeric/categorical features in Chicago trips dataset to be preprocessed
NUM_COLS = ["dayofweek", "hourofday", "trip_distance", "trip_miles", "trip_seconds"]
ORD_COLS = ["company"]
OHE_COLS = ["payment_type"]
Expand All @@ -39,6 +43,9 @@ def indices_in_list(elements: list, base_list: list) -> list:
parser.add_argument("--hparams", default={}, type=json.loads)
args = parser.parse_args()

if args.model.startswith("gs://"):
args.model = Path("/gcs/" + args.model[5:])

logging.info("Read csv files into dataframes")
df_train = pd.read_csv(args.train_data)
df_valid = pd.read_csv(args.valid_data)
Expand Down Expand Up @@ -111,15 +118,25 @@ def indices_in_list(elements: list, base_list: list) -> list:
"rootMeanSquaredLogError": np.sqrt(metrics.mean_squared_log_error(y_test, y_pred)),
}

try:
model_path = args.model.replace("gs://", "/gcs/")
logging.info(f"Save model to: {model_path}")
os.makedirs(model_path, exist_ok=True)
joblib.dump(pipeline, model_path + "model.joblib")
except Exception as e:
print(e)
raise e
logging.info(f"Save model to: {args.model}")
args.model.mkdir(parents=True)
joblib.dump(pipeline, str(args.model / "model.joblib"))

logging.info(f"Metrics: {metrics}")
with open(args.metrics, "w") as fp:
json.dump(metrics, fp)

# Persist URIs of training file(s) for model monitoring in batch predictions
# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501
# for the expected schema.
path = args.model / TRAINING_DATASET_INFO
training_dataset_for_monitoring = {
"gcsSource": {"uris": [args.train_data]},
"dataFormat": "csv",
"targetField": label,
}
logging.info(f"Training dataset info: {training_dataset_for_monitoring}")

with open(path, "w") as fp:
logging.info(f"Save training dataset info for model monitoring: {path}")
json.dump(training_dataset_for_monitoring, fp)
1 change: 1 addition & 0 deletions pipelines/src/pipelines/xgboost/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def xgboost_pipeline(
fail_on_model_not_found=False,
)
.set_display_name("Lookup past model")
.set_caching_options(False)
.outputs["model_resource_name"]
)

Expand Down