Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample: model retraining scenario using AI Platform components #1513

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions samples/ai-platform/Chicago Crime Pipeline.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Chicago Crime Prediction Pipeline\n",
"\n",
"An example notebook that demonstrates how to:\n",
"* Download data from BigQuery\n",
"* Create a Kubeflow pipeline\n",
"* Include Google Cloud AI Platform components to train and deploy the model in the pipeline\n",
"* Submit a job for execution\n",
"\n",
"The model forecasts how many crimes are expected to be reported the next day, based on how many were reported over the previous `n` days."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"\n",
"# Install the SDK (Uncomment the code if the SDK is not installed before)\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.21/kfp.tar.gz'\n",
"!pip3 install --upgrade pip -q\n",
"!pip3 install $KFP_PACKAGE --upgrade -q\n",
"!pip3 install pandas --upgrade -q"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"import kfp\n",
"import kfp.components as comp\n",
"import kfp.dsl as dsl\n",
"import kfp.gcp as gcp\n",
"\n",
"import pandas as pd\n",
"\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Constants"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<ADD GCP PROJECT HERE>'\n",
"GCS_WORKING_DIR = 'gs://<ADD STORAGE LOCATION HERE>' # No ending slash\n",
"\n",
"# Optional Parameters\n",
"REGION = 'us-central1'\n",
"RUNTIME_VERSION = '1.13'\n",
"PACKAGE_URIS=json.dumps(['gs://chicago-crime/chicago_crime_trainer-0.0.tar.gz'])\n",
"TRAINER_OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/train/output/' + str(int(time.time())) + '/'\n",
"DATA_GCS_PATH = GCS_WORKING_DIR + '/reports.csv'\n",
"PYTHON_MODULE = 'trainer.task'\n",
"TRAINER_ARGS = json.dumps([\n",
" '--data-file-url', DATA_GCS_PATH,\n",
" '--job-dir', GCS_WORKING_DIR\n",
"])\n",
"EXPERIMENT_NAME = 'Chicago Crime Prediction'\n",
"PIPELINE_NAME = 'Chicago Crime Prediction'\n",
"PIPELINE_DESCRIPTION = ''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download data\n",
"\n",
"Define a download function that uses the BigQuery component"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bigquery_query_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/bigquery/query/component.yaml')\n",
"\n",
"QUERY = \"\"\"\n",
" SELECT count(*) as count, TIMESTAMP_TRUNC(date, DAY) as day\n",
" FROM `bigquery-public-data.chicago_crime.crime`\n",
" GROUP BY day\n",
" ORDER BY day\n",
"\"\"\"\n",
"\n",
"def download(project_id, data_gcs_path):\n",
"\n",
" return bigquery_query_op(\n",
" query=QUERY,\n",
" project_id=project_id,\n",
" output_gcs_path=data_gcs_path\n",
" ).apply(\n",
" gcp.use_gcp_secret('user-gcp-sa') \n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train the model\n",
"\n",
"Run training code that will pre-process the data and then submit a training job to the AI Platform."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mlengine_train_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/ml_engine/train/component.yaml')\n",
"\n",
"def train(project_id,\n",
" trainer_args,\n",
" package_uris,\n",
" trainer_output_gcs_path,\n",
" gcs_working_dir,\n",
" region,\n",
" python_module,\n",
" runtime_version):\n",
" \n",
" return mlengine_train_op(\n",
" project_id=project_id, \n",
" python_module=python_module,\n",
" package_uris=package_uris,\n",
" region=region,\n",
" args=trainer_args,\n",
" job_dir=trainer_output_gcs_path,\n",
" runtime_version=runtime_version\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy model\n",
"\n",
"Deploy the model with the ID given from the training step"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mlengine_deploy_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/ml_engine/deploy/component.yaml')\n",
"\n",
"def deploy(\n",
" project_id,\n",
" model_uri,\n",
" model_id,\n",
" runtime_version):\n",
" \n",
" return mlengine_deploy_op(\n",
" model_uri=model_uri,\n",
" project_id=project_id, \n",
" model_id=model_id, \n",
" runtime_version=runtime_version, \n",
" replace_existing_version=True, \n",
" set_default=True).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name=PIPELINE_NAME,\n",
" description=PIPELINE_DESCRIPTION\n",
")\n",
"\n",
"def pipeline(\n",
" data_gcs_path=dsl.PipelineParam(name='data_gcs_path', value=DATA_GCS_PATH),\n",
" gcs_working_dir=dsl.PipelineParam(name='gcs_working_dir', value=GCS_WORKING_DIR),\n",
" project_id=dsl.PipelineParam(name='project_id', value=PROJECT_ID),\n",
" python_module=dsl.PipelineParam(name='python_module', value=PYTHON_MODULE),\n",
" region=dsl.PipelineParam(name='region', value=REGION),\n",
" runtime_version=dsl.PipelineParam(name='runtime_version', value=RUNTIME_VERSION),\n",
" package_uris=dsl.PipelineParam(name='package_uris', value=PACKAGE_URIS),\n",
" trainer_output_gcs_path=dsl.PipelineParam(name='trainer_output_gcs_path', value=TRAINER_OUTPUT_GCS_PATH),\n",
" trainer_args=dsl.PipelineParam(name='trainer_args', value=TRAINER_ARGS),\n",
"): \n",
" download_task = download(project_id,\n",
" data_gcs_path)\n",
"\n",
" train_task = train(project_id,\n",
" trainer_args,\n",
" package_uris,\n",
" trainer_output_gcs_path,\n",
" gcs_working_dir,\n",
" region,\n",
" python_module,\n",
" runtime_version).after(download_task)\n",
" \n",
" deploy_task = deploy(project_id,\n",
" train_task.outputs['job_dir'],\n",
" train_task.outputs['job_id'],\n",
" runtime_version) \n",
" return True\n",
"\n",
"# Reference for invocation later\n",
"pipeline_func = pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compile pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n",
"import kfp.compiler as compiler\n",
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline for execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Specify pipeline argument values\n",
"arguments = {}\n",
"\n",
"# Get or create an experiment and submit a pipeline run\n",
"client = kfp.Client()\n",
"try:\n",
" experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)\n",
"except:\n",
" experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"# Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading