Skip to content

Commit

Permalink
updated notebook per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
durgasury committed May 26, 2022
1 parent fa25288 commit 8ea55c8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 74 deletions.
43 changes: 15 additions & 28 deletions sagemaker_processing/local_pyspark/code/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
Expand All @@ -24,11 +21,7 @@
StructField,
StructType,
)


def csv_line(data):
r = ",".join(str(d) for d in data[1])
return str(data[0]) + "," + r
from pyspark.ml.functions import vector_to_array


def main():
Expand All @@ -41,12 +34,7 @@ def main():

spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
spark.sparkContext._jsc.hadoopConfiguration().set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)

# Defining the schema corresponding to the input data. The input data does not contain the headers
# Defining the schema corresponding to the input data. The input data does not contain headers
schema = StructType(
[
StructField("sex", StringType(), True),
Expand Down Expand Up @@ -89,32 +77,31 @@ def main():
outputCol="features",
)

# The pipeline is comprised of the steps added above
# add the above steps to a pipeline
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

# This step trains the feature transformers
# train the feature transformers
model = pipeline.fit(total_df)

# This step transforms the dataset with information obtained from the previous fit
# transform the dataset with information obtained from the previous fit
transformed_total_df = model.transform(total_df)

# Split the overall dataset into 80-20 training and validation
# split the overall dataset into 80-20 training and validation
(train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

# Convert the train dataframe to RDD to save in CSV format and upload to S3
train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
train_lines = train_rdd.map(csv_line)
train_lines.saveAsTextFile(
"s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
# extract only rings and features columns to write to csv
train_df_final = train_df.withColumn("feature", vector_to_array("features")).select(
["rings"] + [col("feature")[i] for i in range(9)]
)

# Convert the validation dataframe to RDD to save in CSV format and upload to S3
validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
validation_lines = validation_rdd.map(csv_line)
validation_lines.saveAsTextFile(
"s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
val_df_final = validation_df.withColumn("feature", vector_to_array("features")).select(
["rings"] + [col("feature")[i] for i in range(9)]
)

# write to csv files in S3
train_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/train")
val_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/validation")


if __name__ == "__main__":
main()
105 changes: 59 additions & 46 deletions sagemaker_processing/local_pyspark/local_pyspark_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"outputs": [],
"source": [
"# setup - install JDK\n",
"# you only need to run this once per kernel\n",
"# you only need to run this once per KernelApp\n",
"%conda install openjdk -y"
]
},
Expand All @@ -31,19 +31,7 @@
"outputs": [],
"source": [
"# install PySpark\n",
"%pip install pyspark==3.2.0"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# upgrade sagemaker sdk\n",
"%pip install --upgrade sagemaker"
"%pip install pyspark==3.1.1"
]
},
{
Expand Down Expand Up @@ -205,7 +193,7 @@
"source": [
"### Run Spark processing scripts locally\n",
"\n",
"You can also run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n",
"You can run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n",
"1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n",
"2. Create a preprocessing pipeline\n",
"3. Fit and transform the dataset\n",
Expand Down Expand Up @@ -306,33 +294,43 @@
"model = pipeline.fit(abalone_df)\n",
"\n",
"# apply transforms to the data frame\n",
"transformed_df = model.transform(abalone_df)"
"transformed_df = model.transform(abalone_df)\n",
"transformed_df.show(2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# split into train and test set, and save to file\n",
"(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])\n",
"\n",
"\n",
"def csv_line(data):\n",
" r = \",\".join(str(d) for d in data[1])\n",
" print(r)\n",
" return str(data[0]) + \",\" + r\n",
"(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# write features to csv\n",
"from pyspark.ml.functions import vector_to_array\n",
"\n",
"# extract only rings and features\n",
"train_df_final = train_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n",
" [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n",
")\n",
"\n",
"# save to file\n",
"train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\n",
"train_lines = train_rdd.map(csv_line)\n",
"train_lines.saveAsTextFile(\"train-set\")\n",
"val_df_final = validation_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n",
" [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n",
")\n",
"\n",
"val_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\n",
"val_lines = val_rdd.map(csv_line)\n",
"val_lines.saveAsTextFile(\"test-set\")"
"# write to csv\n",
"train_df_final.write.csv(\"train\")\n",
"val_df_final.write.csv(\"validation\")"
]
},
{
Expand All @@ -348,10 +346,14 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"\n",
"files = os.listdir(\"./train\")\n",
"file_name = [f for f in files if f.endswith(\".csv\")]\n",
"\n",
"print(\"Top 5 rows from the train file\")\n",
"pd.read_csv(\"./train-set/part-00000\", header=None).head(5)"
"pd.read_csv(f\"./train/{file_name[0]}\", header=None).head(5)"
]
},
{
Expand All @@ -360,7 +362,9 @@
"source": [
"### Run the script as a SageMaker processing job\n",
"\n",
"Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker."
"Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n",
"\n",
"`./code/preprocess.py` script adds the preprocessing we've done above locally to a script that can be used to run a standalone processing job. Let's view the file contents below."
]
},
{
Expand Down Expand Up @@ -388,23 +392,26 @@
"outputs": [],
"source": [
"import logging\n",
"import sagemaker.session\n",
"from time import strftime, gmtime\n",
"from sagemaker.session import Session\n",
"from sagemaker.spark.processing import PySparkProcessor\n",
"from sagemaker.processing import ProcessingInput, ProcessingOutput\n",
"\n",
"sagemaker_logger = logging.getLogger(\"sagemaker\")\n",
"sagemaker_logger.setLevel(logging.INFO)\n",
"sagemaker_logger.addHandler(logging.StreamHandler())\n",
"\n",
"sagemaker_session = sagemaker.Session()\n",
"sagemaker_session = Session()\n",
"bucket = sagemaker_session.default_bucket()\n",
"role = sagemaker.get_execution_role()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# fetch the dataset from the SageMaker bucket\n",
Expand All @@ -421,9 +428,7 @@
"input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n",
"input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n",
"\n",
"sagemaker_session.upload_data(\n",
" path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone\n",
")\n",
"sagemaker_session.upload_data(path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone)\n",
"\n",
"# run the processing job\n",
"spark_processor = PySparkProcessor(\n",
Expand All @@ -433,6 +438,7 @@
" instance_count=2,\n",
" instance_type=\"ml.m5.xlarge\",\n",
" max_runtime_in_seconds=1200,\n",
" tags=[{\"Key\": \"tag-key\", \"Value\": \"tag-value\"}],\n",
")\n",
"\n",
"spark_processor.run(\n",
Expand Down Expand Up @@ -465,17 +471,24 @@
"metadata": {},
"outputs": [],
"source": [
"print(\n",
" \"Top 5 rows from s3://{}/{}/train/\".format(\n",
" bucket, input_preprocessed_prefix_abalone\n",
" )\n",
")\n",
"!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5"
"# get output file name from S3 and print the first five records\n",
"train_output_key = \"\"\n",
"response = s3.list_objects_v2(Bucket=bucket, Prefix=f\"{input_preprocessed_prefix_abalone}/train\")\n",
"\n",
"for cont in response[\"Contents\"]:\n",
" if cont[\"Key\"].endswith(\".csv\"):\n",
" train_output_key = cont[\"Key\"]\n",
"\n",
"if train_output_key == \"\":\n",
" print(\"Preprocessing train file not found. Check to make sure the job ran successfully.\")\n",
"else:\n",
" print(\"Top 5 rows from s3://{}/{}/train/\".format(bucket, input_preprocessed_prefix_abalone))\n",
" s3.download_file(bucket, train_output_key, \"train_output.csv\")\n",
" print(pd.read_csv(\"train_output.csv\", header=None).head())"
]
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Data Science)",
"language": "python",
Expand Down

0 comments on commit 8ea55c8

Please sign in to comment.