Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating solution on labs 4 and 5 of structured ML #219

Merged
merged 2 commits into from
Jul 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
},
{
"cell_type": "markdown",
"metadata": {},
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"After doing a pip install, click **\"Reset Session\"** on the notebook so that the Python environment picks up the new packages."
]
Expand Down
326 changes: 320 additions & 6 deletions courses/machine_learning/deepdive/06_structured/5_train.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,19 @@
"<ol>\n",
"<li> Making the code a Python package\n",
"<li> Using gcloud to submit the training code to Cloud ML Engine\n",
"</ol>\n",
"<p>\n",
"The code in model.py is the same as in the TensorFlow notebook. I just moved it to a file so that I could package it up as a module.\n",
"(explore the <a href=\"babyweight/trainer\">directory structure</a>)."
"</ol>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Lab Task 1\n",
"\n",
"The following code edits babyweight/trainer/task.py."
]
},
{
Expand All @@ -123,8 +132,297 @@
},
"outputs": [],
"source": [
"%bash\n",
"grep \"^def\" babyweight/trainer/model.py"
"%writefile babyweight/trainer/task.py\n",
"import argparse\n",
"import json\n",
"import os\n",
"\n",
"import model\n",
"\n",
"import tensorflow as tf\n",
"from tensorflow.contrib.learn.python.learn import learn_runner\n",
"\n",
"if __name__ == '__main__':\n",
" parser = argparse.ArgumentParser()\n",
" parser.add_argument(\n",
" '--bucket',\n",
" help = 'GCS path to data. We assume that data is in gs://BUCKET/babyweight/preproc/',\n",
" required = True\n",
" )\n",
" parser.add_argument(\n",
" '--output_dir',\n",
" help = 'GCS location to write checkpoints and export models',\n",
" required = True\n",
" )\n",
" parser.add_argument(\n",
" '--batch_size',\n",
" help = 'Number of examples to compute gradient over.',\n",
" type = int,\n",
" default = 512\n",
" )\n",
" parser.add_argument(\n",
" '--job-dir',\n",
" help = 'this model ignores this field, but it is required by gcloud',\n",
" default = 'junk'\n",
" )\n",
" parser.add_argument(\n",
" '--nnsize',\n",
" help = 'Hidden layer sizes to use for DNN feature columns -- provide space-separated layers',\n",
" nargs = '+',\n",
" type = int,\n",
" default=[128, 32, 4]\n",
" )\n",
" parser.add_argument(\n",
" '--nembeds',\n",
" help = 'Embedding size of a cross of n key real-valued parameters',\n",
" type = int,\n",
" default = 3\n",
" )\n",
"\n",
" ## TODO 1: add the new arguments here \n",
" parser.add_argument(\n",
" '--train_examples',\n",
" help = 'Number of examples (in thousands) to run the training job over. If this is more than actual # of examples available, it cycles through them. So specifying 1000 here when you have only 100k examples makes this 10 epochs.',\n",
" type = int,\n",
" default = 5000\n",
" ) \n",
" parser.add_argument(\n",
" '--pattern',\n",
" help = 'Specify a pattern that has to be in input files. For example 00001-of will process only one shard',\n",
" default = 'of'\n",
" )\n",
" parser.add_argument(\n",
" '--eval_steps',\n",
" help = 'Positive number of steps for which to evaluate model. Default to None, which means to evaluate until input_fn raises an end-of-input exception',\n",
" type = int, \n",
" default = None\n",
" )\n",
" \n",
" ## parse all arguments\n",
" args = parser.parse_args()\n",
" arguments = args.__dict__\n",
"\n",
" # unused args provided by service\n",
" arguments.pop('job_dir', None)\n",
" arguments.pop('job-dir', None)\n",
"\n",
" ## assign the arguments to the model variables\n",
" output_dir = arguments.pop('output_dir')\n",
" model.BUCKET = arguments.pop('bucket')\n",
" model.BATCH_SIZE = arguments.pop('batch_size')\n",
" model.TRAIN_STEPS = (arguments.pop('train_examples') * 1000) / model.BATCH_SIZE\n",
" model.EVAL_STEPS = arguments.pop('eval_steps') \n",
" print (\"Will train for {} steps using batch_size={}\".format(model.TRAIN_STEPS, model.BATCH_SIZE))\n",
" model.PATTERN = arguments.pop('pattern')\n",
" model.NEMBEDS= arguments.pop('nembeds')\n",
" model.NNSIZE = arguments.pop('nnsize')\n",
" print (\"Will use DNN size of {}\".format(model.NNSIZE))\n",
"\n",
" # Append trial_id to path if we are doing hptuning\n",
" # This code can be removed if you are not using hyperparameter tuning\n",
" output_dir = os.path.join(\n",
" output_dir,\n",
" json.loads(\n",
" os.environ.get('TF_CONFIG', '{}')\n",
" ).get('task', {}).get('trial', '')\n",
" )\n",
"\n",
" # Run the training job\n",
" model.train_and_evaluate(output_dir)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Lab Task 2\n",
"\n",
"The following code edits babyweight/trainer/model.py."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"%writefile babyweight/trainer/model.py\n",
"from __future__ import absolute_import\n",
"from __future__ import division\n",
"from __future__ import print_function\n",
"\n",
"import shutil\n",
"import numpy as np\n",
"import tensorflow as tf\n",
"\n",
"tf.logging.set_verbosity(tf.logging.INFO)\n",
"\n",
"BUCKET = None # set from task.py\n",
"PATTERN = 'of' # gets all files\n",
"\n",
"# Determine CSV, label, and key columns\n",
"CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks,key'.split(',')\n",
"LABEL_COLUMN = 'weight_pounds'\n",
"KEY_COLUMN = 'key'\n",
"\n",
"# Set default values for each CSV column\n",
"DEFAULTS = [[0.0], ['null'], [0.0], ['null'], [0.0], ['nokey']]\n",
"\n",
"# Define some hyperparameters\n",
"TRAIN_STEPS = 10000\n",
"EVAL_STEPS = None\n",
"BATCH_SIZE = 512\n",
"NEMBEDS = 3\n",
"NNSIZE = [64, 16, 4]\n",
"\n",
"# Create an input function reading a file using the Dataset API\n",
"# Then provide the results to the Estimator API\n",
"def read_dataset(prefix, mode, batch_size):\n",
" def _input_fn():\n",
" def decode_csv(value_column):\n",
" columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)\n",
" features = dict(zip(CSV_COLUMNS, columns))\n",
" label = features.pop(LABEL_COLUMN)\n",
" return features, label\n",
" \n",
" # Use prefix to create file path\n",
" file_path = 'gs://{}/babyweight/preproc/{}*{}*'.format(BUCKET, prefix, PATTERN)\n",
"\n",
" # Create list of files that match pattern\n",
" file_list = tf.gfile.Glob(file_path)\n",
"\n",
" # Create dataset from file list\n",
" dataset = (tf.data.TextLineDataset(file_list) # Read text file\n",
" .map(decode_csv)) # Transform each elem by applying decode_csv fn\n",
" \n",
" if mode == tf.estimator.ModeKeys.TRAIN:\n",
" num_epochs = None # indefinitely\n",
" dataset = dataset.shuffle(buffer_size = 10 * batch_size)\n",
" else:\n",
" num_epochs = 1 # end-of-input after this\n",
" \n",
" dataset = dataset.repeat(num_epochs).batch(batch_size)\n",
" return dataset.make_one_shot_iterator().get_next()\n",
" return _input_fn\n",
"\n",
"# Define feature columns\n",
"def get_wide_deep():\n",
" # Define column types\n",
" is_male,mother_age,plurality,gestation_weeks = \\\n",
" [\\\n",
" tf.feature_column.categorical_column_with_vocabulary_list('is_male', \n",
" ['True', 'False', 'Unknown']),\n",
" tf.feature_column.numeric_column('mother_age'),\n",
" tf.feature_column.categorical_column_with_vocabulary_list('plurality',\n",
" ['Single(1)', 'Twins(2)', 'Triplets(3)',\n",
" 'Quadruplets(4)', 'Quintuplets(5)','Multiple(2+)']),\n",
" tf.feature_column.numeric_column('gestation_weeks')\n",
" ]\n",
"\n",
" # Discretize\n",
" age_buckets = tf.feature_column.bucketized_column(mother_age, \n",
" boundaries=np.arange(15,45,1).tolist())\n",
" gestation_buckets = tf.feature_column.bucketized_column(gestation_weeks, \n",
" boundaries=np.arange(17,47,1).tolist())\n",
" \n",
" # Sparse columns are wide, have a linear relationship with the output\n",
" wide = [is_male,\n",
" plurality,\n",
" age_buckets,\n",
" gestation_buckets]\n",
" \n",
" # Feature cross all the wide columns and embed into a lower dimension\n",
" crossed = tf.feature_column.crossed_column(wide, hash_bucket_size=20000)\n",
" embed = tf.feature_column.embedding_column(crossed, NEMBEDS)\n",
" \n",
" # Continuous columns are deep, have a complex relationship with the output\n",
" deep = [mother_age,\n",
" gestation_weeks,\n",
" embed]\n",
" return wide, deep\n",
"\n",
"# Create serving input function to be able to serve predictions later using provided inputs\n",
"def serving_input_fn():\n",
" feature_placeholders = {\n",
" 'is_male': tf.placeholder(tf.string, [None]),\n",
" 'mother_age': tf.placeholder(tf.float32, [None]),\n",
" 'plurality': tf.placeholder(tf.string, [None]),\n",
" 'gestation_weeks': tf.placeholder(tf.float32, [None]),\n",
" KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])\n",
" }\n",
" features = {\n",
" key: tf.expand_dims(tensor, -1)\n",
" for key, tensor in feature_placeholders.items()\n",
" }\n",
" return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)\n",
"\n",
"# create metric for hyperparameter tuning\n",
"def my_rmse(labels, predictions):\n",
" pred_values = predictions['predictions']\n",
" return {'rmse': tf.metrics.root_mean_squared_error(labels, pred_values)}\n",
"\n",
"# forward to key-column to export\n",
"def forward_key_to_export(estimator):\n",
" estimator = tf.contrib.estimator.forward_features(estimator, KEY_COLUMN)\n",
" # return estimator\n",
"\n",
" ## This shouldn't be necessary (I've filed CL/187793590 to update extenders.py with this code)\n",
" config = estimator.config\n",
" def model_fn2(features, labels, mode):\n",
" estimatorSpec = estimator._call_model_fn(features, labels, mode, config=config)\n",
" if estimatorSpec.export_outputs:\n",
" for ekey in ['predict', 'serving_default']:\n",
" if (ekey in estimatorSpec.export_outputs and\n",
" isinstance(estimatorSpec.export_outputs[ekey],\n",
" tf.estimator.export.PredictOutput)):\n",
" estimatorSpec.export_outputs[ekey] = \\\n",
" tf.estimator.export.PredictOutput(estimatorSpec.predictions)\n",
" return estimatorSpec\n",
" return tf.estimator.Estimator(model_fn=model_fn2, config=config)\n",
"\n",
"# Create estimator to train and evaluate\n",
"def train_and_evaluate(output_dir):\n",
" wide, deep = get_wide_deep()\n",
" EVAL_INTERVAL = 300 # seconds\n",
"\n",
" ## TODO 2a: set the save_checkpoints_secs to the EVAL_INTERVAL\n",
" run_config = tf.estimator.RunConfig(save_checkpoints_secs = EVAL_INTERVAL,\n",
" keep_checkpoint_max = 3)\n",
" \n",
" ## TODO 2b: change the dnn_hidden_units to NNSIZE\n",
" estimator = tf.estimator.DNNLinearCombinedRegressor(\n",
" model_dir = output_dir,\n",
" linear_feature_columns = wide,\n",
" dnn_feature_columns = deep,\n",
" dnn_hidden_units = NNSIZE,\n",
" config = run_config)\n",
" \n",
" estimator = tf.contrib.estimator.add_metrics(estimator, my_rmse)\n",
" estimator = forward_key_to_export(estimator)\n",
"\n",
" ## TODO 2c: Set the third argument of read_dataset to BATCH_SIZE \n",
" ## TODO 2d: and set max_steps to TRAIN_STEPS\n",
" train_spec = tf.estimator.TrainSpec(\n",
" input_fn = read_dataset('train', tf.estimator.ModeKeys.TRAIN, BATCH_SIZE),\n",
" max_steps = TRAIN_STEPS)\n",
" \n",
" exporter = tf.estimator.LatestExporter('exporter', serving_input_fn, exports_to_keep=None)\n",
"\n",
" ## TODO 2e: Lastly, set steps equal to EVAL_STEPS\n",
" eval_spec = tf.estimator.EvalSpec(\n",
" input_fn = read_dataset('eval', tf.estimator.ModeKeys.EVAL, 2**15), # no need to batch in eval\n",
" steps = EVAL_STEPS,\n",
" start_delay_secs = 60, # start evaluating after N seconds\n",
" throttle_secs = EVAL_INTERVAL, # evaluate every N seconds\n",
" exporters = exporter)\n",
" tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)"
]
},
{
Expand All @@ -134,6 +432,8 @@
"editable": true
},
"source": [
"## Lab Task 3\n",
"\n",
"After moving the code to a package, make sure it works standalone. (Note the --pattern and --train_examples lines so that I am not trying to boil the ocean on my laptop). Even then, this takes about <b>3 minutes</b> in which you won't see any output ..."
]
},
Expand All @@ -158,6 +458,18 @@
" --pattern=\"00000-of-\" --train_examples=1 --eval_steps=1"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Lab Task 4\n",
"\n",
"The JSON below represents an input into your prediction model. Write the input.json file below with the next cell, then run the prediction locally to assess whether it produces predictions correctly."
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -196,6 +508,8 @@
"editable": true
},
"source": [
"## Lab Task 5\n",
"\n",
"Once the code works in standalone mode, you can run it on Cloud ML Engine. Because this is on the entire dataset, it will take a while. The training run took about <b> an hour </b> for me. You can monitor the job from the GCP console in the Cloud Machine Learning Engine section."
]
},
Expand Down
Loading