Skip to content

Commit

Permalink
Merge pull request #213 from alexhanna/master
Browse files Browse the repository at this point in the history
Simplify lab 4 and 5
  • Loading branch information
lakshmanok authored Jul 26, 2018
2 parents 3adf9b0 + ce3b4ab commit 0f43fcc
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
"version": "2.7.15"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,35 @@
"import datetime, os\n",
"\n",
"def to_csv(rowdict):\n",
" import hashlib\n",
" import copy\n",
"\n",
" # TODO #1:\n",
" # Pull columns from BQ and create line(s) of CSV input\n",
" # Make sure to carry out the transformations you did (if any) in Pandas\n",
" # e.g. csv_fields[2] = str((rowdict['is_male'] == 'True')? 0.5 : -0.5)\n",
" \n",
" yield ','.join(csv_fields)\n",
" \n",
" CSV_COLUMNS = None\n",
" \n",
" # Create synthetic data where we assume that no ultrasound has been performed\n",
" # and so we don't know sex of the baby. Let's assume that we can tell the difference\n",
" # between single and multiple, but that the errors rates in determining exact number\n",
" # is difficult in the absence of an ultrasound.\n",
" no_ultrasound = copy.deepcopy(rowdict)\n",
" w_ultrasound = copy.deepcopy(rowdict)\n",
"\n",
" no_ultrasound['is_male'] = 'Unknown'\n",
" if rowdict['plurality'] > 1:\n",
" no_ultrasound['plurality'] = 'Multiple(2+)'\n",
" else:\n",
" no_ultrasound['plurality'] = 'Single(1)'\n",
"\n",
" # Change the plurality column to strings\n",
" w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]\n",
"\n",
" # Write out two rows for each input row, one with ultrasound and one without\n",
" for result in [no_ultrasound, w_ultrasound]:\n",
" data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])\n",
" key = hashlib.sha224(data).hexdigest() # hash the columns to form a key\n",
" yield str('{},{}'.format(data, key))\n",
" \n",
"def preprocess(in_test_mode):\n",
" import shutil, os, subprocess\n",
" job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')\n",
Expand Down Expand Up @@ -136,9 +158,7 @@
" else:\n",
" RUNNER = 'DataflowRunner'\n",
" p = beam.Pipeline(RUNNER, options = opts)\n",
" \n",
" \n",
" # TODO Task #2: Modify query as appropriate to pull in the fields you need\n",
" \n",
" query = \"\"\"\n",
"SELECT\n",
" weight_pounds,\n",
Expand Down Expand Up @@ -167,7 +187,8 @@
" selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query)\n",
"\n",
" (p \n",
" | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))\n",
" ## TODO Task #2: Modify the Apache Beam pipeline such that the first part of the pipe reads the data from BigQuery\n",
" | '{}_read'.format(step) >> None \n",
" | '{}_csv'.format(step) >> beam.FlatMap(to_csv)\n",
" | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))\n",
" )\n",
Expand Down Expand Up @@ -223,7 +244,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
"version": "2.7.15"
}
},
"nbformat": 4,
Expand Down
Loading

0 comments on commit 0f43fcc

Please sign in to comment.