From fa2528802d0a786edda83ae1693488492ea53666 Mon Sep 17 00:00:00 2001 From: durgasury Date: Thu, 19 May 2022 17:16:40 +0000 Subject: [PATCH 1/6] adding local pyspark example --- sagemaker_processing/index.rst | 3 +- .../local_pyspark/code/preprocess.py | 120 +++++ .../local_pyspark/local_pyspark_example.ipynb | 499 ++++++++++++++++++ 3 files changed, 621 insertions(+), 1 deletion(-) create mode 100644 sagemaker_processing/local_pyspark/code/preprocess.py create mode 100644 sagemaker_processing/local_pyspark/local_pyspark_example.ipynb diff --git a/sagemaker_processing/index.rst b/sagemaker_processing/index.rst index 7c46219d06..5ca65b46f4 100644 --- a/sagemaker_processing/index.rst +++ b/sagemaker_processing/index.rst @@ -15,4 +15,5 @@ Processing fairness_and_explainability/fairness_and_explainability_spark fairness_and_explainability/text_explainability/text_explainability fairness_and_explainability/text_explainability_sagemaker_algorithm/byo_blazingtext_model_hosting - computer_vision/explainability_image_classification \ No newline at end of file + computer_vision/explainability_image_classification + local_pyspark/local_pyspark_example \ No newline at end of file diff --git a/sagemaker_processing/local_pyspark/code/preprocess.py b/sagemaker_processing/local_pyspark/code/preprocess.py new file mode 100644 index 0000000000..b108ae6d11 --- /dev/null +++ b/sagemaker_processing/local_pyspark/code/preprocess.py @@ -0,0 +1,120 @@ +from __future__ import print_function +from __future__ import unicode_literals + +import argparse +import csv +import os +import shutil +import sys +import time + +import pyspark +from pyspark.sql import SparkSession +from pyspark.ml import Pipeline +from pyspark.ml.feature import ( + OneHotEncoder, + StringIndexer, + VectorAssembler, + VectorIndexer, +) +from pyspark.sql.functions import * +from pyspark.sql.types import ( + DoubleType, + StringType, + StructField, + StructType, +) + + +def csv_line(data): + r = ",".join(str(d) for d in data[1]) + return str(data[0]) + "," + r + + +def main(): + parser = argparse.ArgumentParser(description="app inputs and outputs") + parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket") + parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix") + parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket") + parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix") + args = parser.parse_args() + + spark = SparkSession.builder.appName("PySparkApp").getOrCreate() + + # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format + spark.sparkContext._jsc.hadoopConfiguration().set( + "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter" + ) + + # Defining the schema corresponding to the input data. The input data does not contain the headers + schema = StructType( + [ + StructField("sex", StringType(), True), + StructField("length", DoubleType(), True), + StructField("diameter", DoubleType(), True), + StructField("height", DoubleType(), True), + StructField("whole_weight", DoubleType(), True), + StructField("shucked_weight", DoubleType(), True), + StructField("viscera_weight", DoubleType(), True), + StructField("shell_weight", DoubleType(), True), + StructField("rings", DoubleType(), True), + ] + ) + + # Downloading the data from S3 into a Dataframe + total_df = spark.read.csv( + ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")), + header=False, + schema=schema, + ) + + # StringIndexer on the sex column which has categorical value + sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex") + + # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex) + sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec") + + # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format + assembler = VectorAssembler( + inputCols=[ + "sex_vec", + "length", + "diameter", + "height", + "whole_weight", + "shucked_weight", + "viscera_weight", + "shell_weight", + ], + outputCol="features", + ) + + # The pipeline is comprised of the steps added above + pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler]) + + # This step trains the feature transformers + model = pipeline.fit(total_df) + + # This step transforms the dataset with information obtained from the previous fit + transformed_total_df = model.transform(total_df) + + # Split the overall dataset into 80-20 training and validation + (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2]) + + # Convert the train dataframe to RDD to save in CSV format and upload to S3 + train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features)) + train_lines = train_rdd.map(csv_line) + train_lines.saveAsTextFile( + "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train") + ) + + # Convert the validation dataframe to RDD to save in CSV format and upload to S3 + validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features)) + validation_lines = validation_rdd.map(csv_line) + validation_lines.saveAsTextFile( + "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation") + ) + + +if __name__ == "__main__": + main() diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb new file mode 100644 index 0000000000..8a988730aa --- /dev/null +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -0,0 +1,499 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Run PySpark locally on SageMaker Studio\n", + "\n", + "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebok, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# setup - install JDK\n", + "# you only need to run this once per kernel\n", + "%conda install openjdk -y" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# install PySpark\n", + "%pip install pyspark==3.2.0" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# upgrade sagemaker sdk\n", + "%pip install --upgrade sagemaker" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# import PySpark and build Spark session\n", + "from pyspark.sql import SparkSession\n", + "\n", + "spark = (\n", + " SparkSession.builder.appName(\"PySparkApp\")\n", + " .config(\"spark.jars.packages\", \"org.apache.hadoop:hadoop-aws:3.2.2\")\n", + " .config(\n", + " \"fs.s3a.aws.credentials.provider\",\n", + " \"com.amazonaws.auth.ContainerCredentialsProvider\",\n", + " )\n", + " .getOrCreate()\n", + ")\n", + "\n", + "print(spark.version)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> 1. If you see an exception in running the cell above similar to this - `Exception: Java gateway process exited before sending the driver its port number`, restart your JupyterServer app to make sure you're on the latest version of Studio. \n", + "> 2. If you are running this notebook in a SageMaker Studio notebook, run the above cell as-is. If you are running on a SageMaker notebook instance, replace `com.amazonaws.auth.ContainerCredentialsProvider` with `com.amazonaws.auth.InstanceProfileCredentialsProvider`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create and run user-defined functions\n", + "\n", + "Now that you have installed PySpark and initiated a Spark session, let's try out a couple of sample Pandas user defined functions (UDF)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.types import *\n", + "from pyspark.sql.functions import (\n", + " col,\n", + " count,\n", + " rand,\n", + " collect_list,\n", + " explode,\n", + " struct,\n", + " count,\n", + " lit,\n", + ")\n", + "from pyspark.sql.functions import pandas_udf, PandasUDFType\n", + "\n", + "# generate random data\n", + "df = (\n", + " spark.range(0, 10 * 100 * 100)\n", + " .withColumn(\"id\", (col(\"id\") / 100).cast(\"integer\"))\n", + " .withColumn(\"v\", rand())\n", + ")\n", + "df.cache()\n", + "df.count()\n", + "\n", + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# sample pandas udf to return squared value\n", + "@pandas_udf(\"double\", PandasUDFType.SCALAR)\n", + "def pandas_squared(v):\n", + " return v * v\n", + "\n", + "\n", + "df.withColumn(\"v2\", pandas_squared(df.v))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this next example, you'll run Ordinary least squares (OLS) linear regression by group using [statsmodels](https://www.statsmodels.org/stable/examples/notebooks/generated/ols.html)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df2 = (\n", + " df.withColumn(\"y\", rand())\n", + " .withColumn(\"x1\", rand())\n", + " .withColumn(\"x2\", rand())\n", + " .select(\"id\", \"y\", \"x1\", \"x2\")\n", + ")\n", + "df2.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import statsmodels.api as sm\n", + "\n", + "group_column = \"id\"\n", + "y_column = \"y\"\n", + "x_columns = [\"x1\", \"x2\"]\n", + "schema = df2.select(group_column, *x_columns).schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# sample UDF with input and output data frames\n", + "@pandas_udf(schema, PandasUDFType.GROUPED_MAP)\n", + "def ols(pdf):\n", + " group_key = pdf[group_column].iloc[0]\n", + " y = pdf[y_column]\n", + " X = pdf[x_columns]\n", + " X = sm.add_constant(X)\n", + " model = sm.OLS(y, X).fit()\n", + " return pd.DataFrame(\n", + " [[group_key] + [model.params[i] for i in x_columns]],\n", + " columns=[group_column] + x_columns,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# run ols grouped by the \"id\" group column\n", + "beta = df2.groupby(group_column).apply(ols)\n", + "beta.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run Spark processing scripts locally\n", + "\n", + "You can also run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", + "1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n", + "2. Create a preprocessing pipeline\n", + "3. Fit and transform the dataset\n", + "4. Split into a training and validation set\n", + "5. Save the files to local storage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.types import (\n", + " DoubleType,\n", + " StringType,\n", + " StructField,\n", + " StructType,\n", + ")\n", + "from pyspark.ml.feature import (\n", + " OneHotEncoder,\n", + " StringIndexer,\n", + " VectorAssembler,\n", + " VectorIndexer,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "schema = StructType(\n", + " [\n", + " StructField(\"sex\", StringType(), True),\n", + " StructField(\"length\", DoubleType(), True),\n", + " StructField(\"diameter\", DoubleType(), True),\n", + " StructField(\"height\", DoubleType(), True),\n", + " StructField(\"whole_weight\", DoubleType(), True),\n", + " StructField(\"shucked_weight\", DoubleType(), True),\n", + " StructField(\"viscera_weight\", DoubleType(), True),\n", + " StructField(\"shell_weight\", DoubleType(), True),\n", + " StructField(\"rings\", DoubleType(), True),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_uri = \"s3a://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv\"\n", + "\n", + "abalone_df = spark.read.csv(data_uri, header=False, schema=schema)\n", + "abalone_df.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# StringIndexer on the sex column which has categorical value\n", + "sex_indexer = StringIndexer(inputCol=\"sex\", outputCol=\"indexed_sex\")\n", + "\n", + "# one-hot encoding on the string-indexed sex column (indexed_sex)\n", + "sex_encoder = OneHotEncoder(inputCol=\"indexed_sex\", outputCol=\"sex_vec\")\n", + "\n", + "# vector-assembler will bring all the features to a 1D vector to save easily into CSV format\n", + "assembler = VectorAssembler(\n", + " inputCols=[\n", + " \"sex_vec\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + " ],\n", + " outputCol=\"features\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.ml import Pipeline\n", + "\n", + "pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])\n", + "model = pipeline.fit(abalone_df)\n", + "\n", + "# apply transforms to the data frame\n", + "transformed_df = model.transform(abalone_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# split into train and test set, and save to file\n", + "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])\n", + "\n", + "\n", + "def csv_line(data):\n", + " r = \",\".join(str(d) for d in data[1])\n", + " print(r)\n", + " return str(data[0]) + \",\" + r\n", + "\n", + "\n", + "# save to file\n", + "train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\n", + "train_lines = train_rdd.map(csv_line)\n", + "train_lines.saveAsTextFile(\"train-set\")\n", + "\n", + "val_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\n", + "val_lines = val_rdd.map(csv_line)\n", + "val_lines.saveAsTextFile(\"test-set\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Print the first five rows of the preprocessed output file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "print(\"Top 5 rows from the train file\")\n", + "pd.read_csv(\"./train-set/part-00000\", header=None).head(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run the script as a SageMaker processing job\n", + "\n", + "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "!pygmentize ./code/preprocess.py" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We'll now use the `PySparkProcessor` class to define a Spark job and run it using SageMaker processing. For detailed reference, see [Data Processing with Spark](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#data-processing-with-spark)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import sagemaker.session\n", + "from time import strftime, gmtime\n", + "from sagemaker.spark.processing import PySparkProcessor\n", + "\n", + "sagemaker_logger = logging.getLogger(\"sagemaker\")\n", + "sagemaker_logger.setLevel(logging.INFO)\n", + "sagemaker_logger.addHandler(logging.StreamHandler())\n", + "\n", + "sagemaker_session = sagemaker.Session()\n", + "bucket = sagemaker_session.default_bucket()\n", + "role = sagemaker.get_execution_role()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# fetch the dataset from the SageMaker bucket\n", + "import boto3\n", + "\n", + "s3 = boto3.client(\"s3\")\n", + "s3.download_file(\n", + " f\"sagemaker-sample-files\", \"datasets/tabular/uci_abalone/abalone.csv\", \"abalone.csv\"\n", + ")\n", + "\n", + "# upload the raw input dataset to a unique S3 location\n", + "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", + "prefix = \"sagemaker/local-pyspark/{}\".format(timestamp_prefix)\n", + "input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n", + "input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n", + "\n", + "sagemaker_session.upload_data(\n", + " path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone\n", + ")\n", + "\n", + "# run the processing job\n", + "spark_processor = PySparkProcessor(\n", + " base_job_name=\"local-pyspark\",\n", + " framework_version=\"3.1\",\n", + " role=role,\n", + " instance_count=2,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " max_runtime_in_seconds=1200,\n", + ")\n", + "\n", + "spark_processor.run(\n", + " submit_app=\"./code/preprocess.py\",\n", + " arguments=[\n", + " \"--s3_input_bucket\",\n", + " bucket,\n", + " \"--s3_input_key_prefix\",\n", + " input_prefix_abalone,\n", + " \"--s3_output_bucket\",\n", + " bucket,\n", + " \"--s3_output_key_prefix\",\n", + " input_preprocessed_prefix_abalone,\n", + " ],\n", + " spark_event_logs_s3_uri=\"s3://{}/{}/spark_event_logs\".format(bucket, prefix),\n", + " logs=False,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inspect the first five rows of the preprocessed output file. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\n", + " \"Top 5 rows from s3://{}/{}/train/\".format(\n", + " bucket, input_preprocessed_prefix_abalone\n", + " )\n", + ")\n", + "!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From 8ea55c86cc1bcba263fba49cd4e675c807940a09 Mon Sep 17 00:00:00 2001 From: durgasury Date: Thu, 26 May 2022 17:25:23 +0000 Subject: [PATCH 2/6] updated notebook per review comments --- .../local_pyspark/code/preprocess.py | 43 +++---- .../local_pyspark/local_pyspark_example.ipynb | 105 ++++++++++-------- 2 files changed, 74 insertions(+), 74 deletions(-) diff --git a/sagemaker_processing/local_pyspark/code/preprocess.py b/sagemaker_processing/local_pyspark/code/preprocess.py index b108ae6d11..c095b8b99c 100644 --- a/sagemaker_processing/local_pyspark/code/preprocess.py +++ b/sagemaker_processing/local_pyspark/code/preprocess.py @@ -1,6 +1,3 @@ -from __future__ import print_function -from __future__ import unicode_literals - import argparse import csv import os @@ -24,11 +21,7 @@ StructField, StructType, ) - - -def csv_line(data): - r = ",".join(str(d) for d in data[1]) - return str(data[0]) + "," + r +from pyspark.ml.functions import vector_to_array def main(): @@ -41,12 +34,7 @@ def main(): spark = SparkSession.builder.appName("PySparkApp").getOrCreate() - # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format - spark.sparkContext._jsc.hadoopConfiguration().set( - "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter" - ) - - # Defining the schema corresponding to the input data. The input data does not contain the headers + # Defining the schema corresponding to the input data. The input data does not contain headers schema = StructType( [ StructField("sex", StringType(), True), @@ -89,32 +77,31 @@ def main(): outputCol="features", ) - # The pipeline is comprised of the steps added above + # add the above steps to a pipeline pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler]) - # This step trains the feature transformers + # train the feature transformers model = pipeline.fit(total_df) - # This step transforms the dataset with information obtained from the previous fit + # transform the dataset with information obtained from the previous fit transformed_total_df = model.transform(total_df) - # Split the overall dataset into 80-20 training and validation + # split the overall dataset into 80-20 training and validation (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2]) - # Convert the train dataframe to RDD to save in CSV format and upload to S3 - train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features)) - train_lines = train_rdd.map(csv_line) - train_lines.saveAsTextFile( - "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train") + # extract only rings and features columns to write to csv + train_df_final = train_df.withColumn("feature", vector_to_array("features")).select( + ["rings"] + [col("feature")[i] for i in range(9)] ) - # Convert the validation dataframe to RDD to save in CSV format and upload to S3 - validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features)) - validation_lines = validation_rdd.map(csv_line) - validation_lines.saveAsTextFile( - "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation") + val_df_final = validation_df.withColumn("feature", vector_to_array("features")).select( + ["rings"] + [col("feature")[i] for i in range(9)] ) + # write to csv files in S3 + train_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/train") + val_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/validation") + if __name__ == "__main__": main() diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index 8a988730aa..839dc27c2c 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -18,7 +18,7 @@ "outputs": [], "source": [ "# setup - install JDK\n", - "# you only need to run this once per kernel\n", + "# you only need to run this once per KernelApp\n", "%conda install openjdk -y" ] }, @@ -31,19 +31,7 @@ "outputs": [], "source": [ "# install PySpark\n", - "%pip install pyspark==3.2.0" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "scrolled": true - }, - "outputs": [], - "source": [ - "# upgrade sagemaker sdk\n", - "%pip install --upgrade sagemaker" + "%pip install pyspark==3.1.1" ] }, { @@ -205,7 +193,7 @@ "source": [ "### Run Spark processing scripts locally\n", "\n", - "You can also run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", + "You can run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", "1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n", "2. Create a preprocessing pipeline\n", "3. Fit and transform the dataset\n", @@ -306,33 +294,43 @@ "model = pipeline.fit(abalone_df)\n", "\n", "# apply transforms to the data frame\n", - "transformed_df = model.transform(abalone_df)" + "transformed_df = model.transform(abalone_df)\n", + "transformed_df.show(2)" ] }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# split into train and test set, and save to file\n", - "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])\n", - "\n", - "\n", - "def csv_line(data):\n", - " r = \",\".join(str(d) for d in data[1])\n", - " print(r)\n", - " return str(data[0]) + \",\" + r\n", + "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# write features to csv\n", + "from pyspark.ml.functions import vector_to_array\n", "\n", + "# extract only rings and features\n", + "train_df_final = train_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n", + " [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n", + ")\n", "\n", - "# save to file\n", - "train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\n", - "train_lines = train_rdd.map(csv_line)\n", - "train_lines.saveAsTextFile(\"train-set\")\n", + "val_df_final = validation_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n", + " [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n", + ")\n", "\n", - "val_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\n", - "val_lines = val_rdd.map(csv_line)\n", - "val_lines.saveAsTextFile(\"test-set\")" + "# write to csv\n", + "train_df_final.write.csv(\"train\")\n", + "val_df_final.write.csv(\"validation\")" ] }, { @@ -348,10 +346,14 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", "import pandas as pd\n", "\n", + "files = os.listdir(\"./train\")\n", + "file_name = [f for f in files if f.endswith(\".csv\")]\n", + "\n", "print(\"Top 5 rows from the train file\")\n", - "pd.read_csv(\"./train-set/part-00000\", header=None).head(5)" + "pd.read_csv(f\"./train/{file_name[0]}\", header=None).head(5)" ] }, { @@ -360,7 +362,9 @@ "source": [ "### Run the script as a SageMaker processing job\n", "\n", - "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker." + "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n", + "\n", + "`./code/preprocess.py` script adds the preprocessing we've done above locally to a script that can be used to run a standalone processing job. Let's view the file contents below." ] }, { @@ -388,15 +392,16 @@ "outputs": [], "source": [ "import logging\n", - "import sagemaker.session\n", "from time import strftime, gmtime\n", + "from sagemaker.session import Session\n", "from sagemaker.spark.processing import PySparkProcessor\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "\n", "sagemaker_logger = logging.getLogger(\"sagemaker\")\n", "sagemaker_logger.setLevel(logging.INFO)\n", "sagemaker_logger.addHandler(logging.StreamHandler())\n", "\n", - "sagemaker_session = sagemaker.Session()\n", + "sagemaker_session = Session()\n", "bucket = sagemaker_session.default_bucket()\n", "role = sagemaker.get_execution_role()" ] @@ -404,7 +409,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [], "source": [ "# fetch the dataset from the SageMaker bucket\n", @@ -421,9 +428,7 @@ "input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n", "input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n", "\n", - "sagemaker_session.upload_data(\n", - " path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone\n", - ")\n", + "sagemaker_session.upload_data(path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone)\n", "\n", "# run the processing job\n", "spark_processor = PySparkProcessor(\n", @@ -433,6 +438,7 @@ " instance_count=2,\n", " instance_type=\"ml.m5.xlarge\",\n", " max_runtime_in_seconds=1200,\n", + " tags=[{\"Key\": \"tag-key\", \"Value\": \"tag-value\"}],\n", ")\n", "\n", "spark_processor.run(\n", @@ -465,17 +471,24 @@ "metadata": {}, "outputs": [], "source": [ - "print(\n", - " \"Top 5 rows from s3://{}/{}/train/\".format(\n", - " bucket, input_preprocessed_prefix_abalone\n", - " )\n", - ")\n", - "!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5" + "# get output file name from S3 and print the first five records\n", + "train_output_key = \"\"\n", + "response = s3.list_objects_v2(Bucket=bucket, Prefix=f\"{input_preprocessed_prefix_abalone}/train\")\n", + "\n", + "for cont in response[\"Contents\"]:\n", + " if cont[\"Key\"].endswith(\".csv\"):\n", + " train_output_key = cont[\"Key\"]\n", + "\n", + "if train_output_key == \"\":\n", + " print(\"Preprocessing train file not found. Check to make sure the job ran successfully.\")\n", + "else:\n", + " print(\"Top 5 rows from s3://{}/{}/train/\".format(bucket, input_preprocessed_prefix_abalone))\n", + " s3.download_file(bucket, train_output_key, \"train_output.csv\")\n", + " print(pd.read_csv(\"train_output.csv\", header=None).head())" ] } ], "metadata": { - "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", From c42328aa7ab7dfda7cb46069374af605edf3a566 Mon Sep 17 00:00:00 2001 From: durgasury Date: Tue, 31 May 2022 17:33:26 -0600 Subject: [PATCH 3/6] fixing typo in local_pyspark_example.ipynb Co-authored-by: Sean Morgan --- sagemaker_processing/local_pyspark/local_pyspark_example.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index 839dc27c2c..de0d50e2f9 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -6,7 +6,7 @@ "source": [ "## Run PySpark locally on SageMaker Studio\n", "\n", - "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebok, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." + "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebook, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." ] }, { From dd0ba8109a662a4040e4f2d6baaecc45b8d30da2 Mon Sep 17 00:00:00 2001 From: durgasury Date: Wed, 1 Jun 2022 23:28:59 +0000 Subject: [PATCH 4/6] adding import sagemaker --- .../local_pyspark/local_pyspark_example.ipynb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index de0d50e2f9..6f57894bf2 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -9,6 +9,17 @@ "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebook, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# import sagemaker SDK\n", + "import sagemaker\n", + "print(sagemaker.__version__)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -486,9 +497,17 @@ " s3.download_file(bucket, train_output_key, \"train_output.csv\")\n", " print(pd.read_csv(\"train_output.csv\", header=None).head())" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { + "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", From 9e5493d23b15a0be923ddfff6dc497157e4c6ed7 Mon Sep 17 00:00:00 2001 From: durgasury Date: Thu, 2 Jun 2022 15:16:45 +0000 Subject: [PATCH 5/6] fix formatting --- sagemaker_processing/local_pyspark/local_pyspark_example.ipynb | 1 + 1 file changed, 1 insertion(+) diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index 6f57894bf2..6c8b0a4492 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -17,6 +17,7 @@ "source": [ "# import sagemaker SDK\n", "import sagemaker\n", + "\n", "print(sagemaker.__version__)" ] }, From 02158ec51480764331fb76ad733fdaccf0c66086 Mon Sep 17 00:00:00 2001 From: durgasury Date: Wed, 6 Jul 2022 22:21:42 +0000 Subject: [PATCH 6/6] fix formatting and grammar --- .../local_pyspark/local_pyspark_example.ipynb | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb index 6c8b0a4492..af9ef89b1d 100644 --- a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -4,7 +4,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Run PySpark locally on SageMaker Studio\n", + "# Run PySpark locally on SageMaker Studio\n", "\n", "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebook, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." ] @@ -80,7 +80,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Create and run user-defined functions\n", + "## Create and run user-defined functions\n", "\n", "Now that you have installed PySpark and initiated a Spark session, let's try out a couple of sample Pandas user defined functions (UDF)." ] @@ -203,7 +203,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Run Spark processing scripts locally\n", + "## Run Spark processing scripts locally\n", "\n", "You can run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", "1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n", @@ -372,9 +372,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Run the script as a SageMaker processing job\n", + "## Run the script as a SageMaker processing job\n", "\n", - "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n", + "Once experimentation is complete, you can run the script as a SageMaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n", "\n", "`./code/preprocess.py` script adds the preprocessing we've done above locally to a script that can be used to run a standalone processing job. Let's view the file contents below." ] @@ -500,11 +500,15 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], - "source": [] + "source": [ + "## Conclusion and Cleanup\n", + "\n", + "In this notebook, we installed PySpark on Studio notebook and created a spark session to run PySpark code locally within Studio. You can use this as a starting point to prototype your Spark code on a smaller sample of your data before running the SageMaker processing job on your entire dataset. You can extend this example to preprocess your data for machine learning.\n", + "\n", + "To avoid incurring costs, remember to shut down the SageMaker Studio app, or stop the notebook instance as necessary." + ] } ], "metadata": {