diff --git a/sagemaker_processing/local_pyspark/code/preprocess.py b/sagemaker_processing/local_pyspark/code/preprocess.py index b108ae6d11..c095b8b99c 100644 --- a/sagemaker_processing/local_pyspark/code/preprocess.py +++ b/sagemaker_processing/local_pyspark/code/preprocess.py @@ -1,6 +1,3 @@ -from __future__ import print_function -from __future__ import unicode_literals - import argparse import csv import os @@ -24,11 +21,7 @@ StructField, StructType, ) - - -def csv_line(data): - r = ",".join(str(d) for d in data[1]) - return str(data[0]) + "," + r +from pyspark.ml.functions import vector_to_array def main(): @@ -41,12 +34,7 @@ def main(): spark = SparkSession.builder.appName("PySparkApp").getOrCreate() - # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format - spark.sparkContext._jsc.hadoopConfiguration().set( - "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter" - ) - - # Defining the schema corresponding to the input data. The input data does not contain the headers + # Defining the schema corresponding to the input data. The input data does not contain headers schema = StructType( [ StructField("sex", StringType(), True), @@ -89,32 +77,31 @@ def main(): outputCol="features", ) - # The pipeline is comprised of the steps added above + # add the above steps to a pipeline pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler]) - # This step trains the feature transformers + # train the feature transformers model = pipeline.fit(total_df) - # This step transforms the dataset with information obtained from the previous fit + # transform the dataset with information obtained from the previous fit transformed_total_df = model.transform(total_df) - # Split the overall dataset into 80-20 training and validation + # split the overall dataset into 80-20 training and validation (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2]) - # Convert the train dataframe to RDD to save in CSV format and upload to S3 - train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features)) - train_lines = train_rdd.map(csv_line) - train_lines.saveAsTextFile( - "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train") + # extract only rings and features columns to write to csv + train_df_final = train_df.withColumn("feature", vector_to_array("features")).select( + ["rings"] + [col("feature")[i] for i in range(9)] ) - # Convert the validation dataframe to RDD to save in CSV format and upload to S3 - validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features)) - validation_lines = validation_rdd.map(csv_line) - validation_lines.saveAsTextFile( - "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation") + val_df_final = validation_df.withColumn("feature", vector_to_array("features")).select( + ["rings"] + [col("feature")[i] for i in range(9)] ) + # write to csv files in S3 + train_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/train") + val_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/validation") + if __name__ == "__main__": main() diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index 8a988730aa..839dc27c2c 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -18,7 +18,7 @@ "outputs": [], "source": [ "# setup - install JDK\n", - "# you only need to run this once per kernel\n", + "# you only need to run this once per KernelApp\n", "%conda install openjdk -y" ] }, @@ -31,19 +31,7 @@ "outputs": [], "source": [ "# install PySpark\n", - "%pip install pyspark==3.2.0" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "scrolled": true - }, - "outputs": [], - "source": [ - "# upgrade sagemaker sdk\n", - "%pip install --upgrade sagemaker" + "%pip install pyspark==3.1.1" ] }, { @@ -205,7 +193,7 @@ "source": [ "### Run Spark processing scripts locally\n", "\n", - "You can also run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", + "You can run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", "1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n", "2. Create a preprocessing pipeline\n", "3. Fit and transform the dataset\n", @@ -306,33 +294,43 @@ "model = pipeline.fit(abalone_df)\n", "\n", "# apply transforms to the data frame\n", - "transformed_df = model.transform(abalone_df)" + "transformed_df = model.transform(abalone_df)\n", + "transformed_df.show(2)" ] }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# split into train and test set, and save to file\n", - "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])\n", - "\n", - "\n", - "def csv_line(data):\n", - " r = \",\".join(str(d) for d in data[1])\n", - " print(r)\n", - " return str(data[0]) + \",\" + r\n", + "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# write features to csv\n", + "from pyspark.ml.functions import vector_to_array\n", "\n", + "# extract only rings and features\n", + "train_df_final = train_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n", + " [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n", + ")\n", "\n", - "# save to file\n", - "train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\n", - "train_lines = train_rdd.map(csv_line)\n", - "train_lines.saveAsTextFile(\"train-set\")\n", + "val_df_final = validation_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n", + " [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n", + ")\n", "\n", - "val_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\n", - "val_lines = val_rdd.map(csv_line)\n", - "val_lines.saveAsTextFile(\"test-set\")" + "# write to csv\n", + "train_df_final.write.csv(\"train\")\n", + "val_df_final.write.csv(\"validation\")" ] }, { @@ -348,10 +346,14 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", "import pandas as pd\n", "\n", + "files = os.listdir(\"./train\")\n", + "file_name = [f for f in files if f.endswith(\".csv\")]\n", + "\n", "print(\"Top 5 rows from the train file\")\n", - "pd.read_csv(\"./train-set/part-00000\", header=None).head(5)" + "pd.read_csv(f\"./train/{file_name[0]}\", header=None).head(5)" ] }, { @@ -360,7 +362,9 @@ "source": [ "### Run the script as a SageMaker processing job\n", "\n", - "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker." + "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n", + "\n", + "`./code/preprocess.py` script adds the preprocessing we've done above locally to a script that can be used to run a standalone processing job. Let's view the file contents below." ] }, { @@ -388,15 +392,16 @@ "outputs": [], "source": [ "import logging\n", - "import sagemaker.session\n", "from time import strftime, gmtime\n", + "from sagemaker.session import Session\n", "from sagemaker.spark.processing import PySparkProcessor\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "\n", "sagemaker_logger = logging.getLogger(\"sagemaker\")\n", "sagemaker_logger.setLevel(logging.INFO)\n", "sagemaker_logger.addHandler(logging.StreamHandler())\n", "\n", - "sagemaker_session = sagemaker.Session()\n", + "sagemaker_session = Session()\n", "bucket = sagemaker_session.default_bucket()\n", "role = sagemaker.get_execution_role()" ] @@ -404,7 +409,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# fetch the dataset from the SageMaker bucket\n", @@ -421,9 +428,7 @@ "input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n", "input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n", "\n", - "sagemaker_session.upload_data(\n", - " path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone\n", - ")\n", + "sagemaker_session.upload_data(path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone)\n", "\n", "# run the processing job\n", "spark_processor = PySparkProcessor(\n", @@ -433,6 +438,7 @@ " instance_count=2,\n", " instance_type=\"ml.m5.xlarge\",\n", " max_runtime_in_seconds=1200,\n", + " tags=[{\"Key\": \"tag-key\", \"Value\": \"tag-value\"}],\n", ")\n", "\n", "spark_processor.run(\n", @@ -465,17 +471,24 @@ "metadata": {}, "outputs": [], "source": [ - "print(\n", - " \"Top 5 rows from s3://{}/{}/train/\".format(\n", - " bucket, input_preprocessed_prefix_abalone\n", - " )\n", - ")\n", - "!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5" + "# get output file name from S3 and print the first five records\n", + "train_output_key = \"\"\n", + "response = s3.list_objects_v2(Bucket=bucket, Prefix=f\"{input_preprocessed_prefix_abalone}/train\")\n", + "\n", + "for cont in response[\"Contents\"]:\n", + " if cont[\"Key\"].endswith(\".csv\"):\n", + " train_output_key = cont[\"Key\"]\n", + "\n", + "if train_output_key == \"\":\n", + " print(\"Preprocessing train file not found. Check to make sure the job ran successfully.\")\n", + "else:\n", + " print(\"Top 5 rows from s3://{}/{}/train/\".format(bucket, input_preprocessed_prefix_abalone))\n", + " s3.download_file(bucket, train_output_key, \"train_output.csv\")\n", + " print(pd.read_csv(\"train_output.csv\", header=None).head())" ] } ], "metadata": { - "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python",