From fa2528802d0a786edda83ae1693488492ea53666 Mon Sep 17 00:00:00 2001 From: durgasury Date: Thu, 19 May 2022 17:16:40 +0000 Subject: [PATCH] adding local pyspark example --- sagemaker_processing/index.rst | 3 +- .../local_pyspark/code/preprocess.py | 120 +++++ .../local_pyspark/local_pyspark_example.ipynb | 499 ++++++++++++++++++ 3 files changed, 621 insertions(+), 1 deletion(-) create mode 100644 sagemaker_processing/local_pyspark/code/preprocess.py create mode 100644 sagemaker_processing/local_pyspark/local_pyspark_example.ipynb diff --git a/sagemaker_processing/index.rst b/sagemaker_processing/index.rst index 7c46219d06..5ca65b46f4 100644 --- a/sagemaker_processing/index.rst +++ b/sagemaker_processing/index.rst @@ -15,4 +15,5 @@ Processing fairness_and_explainability/fairness_and_explainability_spark fairness_and_explainability/text_explainability/text_explainability fairness_and_explainability/text_explainability_sagemaker_algorithm/byo_blazingtext_model_hosting - computer_vision/explainability_image_classification \ No newline at end of file + computer_vision/explainability_image_classification + local_pyspark/local_pyspark_example \ No newline at end of file diff --git a/sagemaker_processing/local_pyspark/code/preprocess.py b/sagemaker_processing/local_pyspark/code/preprocess.py new file mode 100644 index 0000000000..b108ae6d11 --- /dev/null +++ b/sagemaker_processing/local_pyspark/code/preprocess.py @@ -0,0 +1,120 @@ +from __future__ import print_function +from __future__ import unicode_literals + +import argparse +import csv +import os +import shutil +import sys +import time + +import pyspark +from pyspark.sql import SparkSession +from pyspark.ml import Pipeline +from pyspark.ml.feature import ( + OneHotEncoder, + StringIndexer, + VectorAssembler, + VectorIndexer, +) +from pyspark.sql.functions import * +from pyspark.sql.types import ( + DoubleType, + StringType, + StructField, + StructType, +) + + +def csv_line(data): + r = ",".join(str(d) for d in data[1]) + return str(data[0]) + "," + r + + +def main(): + parser = argparse.ArgumentParser(description="app inputs and outputs") + parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket") + parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix") + parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket") + parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix") + args = parser.parse_args() + + spark = SparkSession.builder.appName("PySparkApp").getOrCreate() + + # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format + spark.sparkContext._jsc.hadoopConfiguration().set( + "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter" + ) + + # Defining the schema corresponding to the input data. The input data does not contain the headers + schema = StructType( + [ + StructField("sex", StringType(), True), + StructField("length", DoubleType(), True), + StructField("diameter", DoubleType(), True), + StructField("height", DoubleType(), True), + StructField("whole_weight", DoubleType(), True), + StructField("shucked_weight", DoubleType(), True), + StructField("viscera_weight", DoubleType(), True), + StructField("shell_weight", DoubleType(), True), + StructField("rings", DoubleType(), True), + ] + ) + + # Downloading the data from S3 into a Dataframe + total_df = spark.read.csv( + ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")), + header=False, + schema=schema, + ) + + # StringIndexer on the sex column which has categorical value + sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex") + + # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex) + sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec") + + # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format + assembler = VectorAssembler( + inputCols=[ + "sex_vec", + "length", + "diameter", + "height", + "whole_weight", + "shucked_weight", + "viscera_weight", + "shell_weight", + ], + outputCol="features", + ) + + # The pipeline is comprised of the steps added above + pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler]) + + # This step trains the feature transformers + model = pipeline.fit(total_df) + + # This step transforms the dataset with information obtained from the previous fit + transformed_total_df = model.transform(total_df) + + # Split the overall dataset into 80-20 training and validation + (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2]) + + # Convert the train dataframe to RDD to save in CSV format and upload to S3 + train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features)) + train_lines = train_rdd.map(csv_line) + train_lines.saveAsTextFile( + "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train") + ) + + # Convert the validation dataframe to RDD to save in CSV format and upload to S3 + validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features)) + validation_lines = validation_rdd.map(csv_line) + validation_lines.saveAsTextFile( + "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation") + ) + + +if __name__ == "__main__": + main() diff --git a/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb new file mode 100644 index 0000000000..8a988730aa --- /dev/null +++ b/sagemaker_processing/local_pyspark/local_pyspark_example.ipynb @@ -0,0 +1,499 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Run PySpark locally on SageMaker Studio\n", + "\n", + "This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebok, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# setup - install JDK\n", + "# you only need to run this once per kernel\n", + "%conda install openjdk -y" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# install PySpark\n", + "%pip install pyspark==3.2.0" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# upgrade sagemaker sdk\n", + "%pip install --upgrade sagemaker" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# import PySpark and build Spark session\n", + "from pyspark.sql import SparkSession\n", + "\n", + "spark = (\n", + " SparkSession.builder.appName(\"PySparkApp\")\n", + " .config(\"spark.jars.packages\", \"org.apache.hadoop:hadoop-aws:3.2.2\")\n", + " .config(\n", + " \"fs.s3a.aws.credentials.provider\",\n", + " \"com.amazonaws.auth.ContainerCredentialsProvider\",\n", + " )\n", + " .getOrCreate()\n", + ")\n", + "\n", + "print(spark.version)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> 1. If you see an exception in running the cell above similar to this - `Exception: Java gateway process exited before sending the driver its port number`, restart your JupyterServer app to make sure you're on the latest version of Studio. \n", + "> 2. If you are running this notebook in a SageMaker Studio notebook, run the above cell as-is. If you are running on a SageMaker notebook instance, replace `com.amazonaws.auth.ContainerCredentialsProvider` with `com.amazonaws.auth.InstanceProfileCredentialsProvider`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create and run user-defined functions\n", + "\n", + "Now that you have installed PySpark and initiated a Spark session, let's try out a couple of sample Pandas user defined functions (UDF)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.types import *\n", + "from pyspark.sql.functions import (\n", + " col,\n", + " count,\n", + " rand,\n", + " collect_list,\n", + " explode,\n", + " struct,\n", + " count,\n", + " lit,\n", + ")\n", + "from pyspark.sql.functions import pandas_udf, PandasUDFType\n", + "\n", + "# generate random data\n", + "df = (\n", + " spark.range(0, 10 * 100 * 100)\n", + " .withColumn(\"id\", (col(\"id\") / 100).cast(\"integer\"))\n", + " .withColumn(\"v\", rand())\n", + ")\n", + "df.cache()\n", + "df.count()\n", + "\n", + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# sample pandas udf to return squared value\n", + "@pandas_udf(\"double\", PandasUDFType.SCALAR)\n", + "def pandas_squared(v):\n", + " return v * v\n", + "\n", + "\n", + "df.withColumn(\"v2\", pandas_squared(df.v))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this next example, you'll run Ordinary least squares (OLS) linear regression by group using [statsmodels](https://www.statsmodels.org/stable/examples/notebooks/generated/ols.html)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df2 = (\n", + " df.withColumn(\"y\", rand())\n", + " .withColumn(\"x1\", rand())\n", + " .withColumn(\"x2\", rand())\n", + " .select(\"id\", \"y\", \"x1\", \"x2\")\n", + ")\n", + "df2.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import statsmodels.api as sm\n", + "\n", + "group_column = \"id\"\n", + "y_column = \"y\"\n", + "x_columns = [\"x1\", \"x2\"]\n", + "schema = df2.select(group_column, *x_columns).schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# sample UDF with input and output data frames\n", + "@pandas_udf(schema, PandasUDFType.GROUPED_MAP)\n", + "def ols(pdf):\n", + " group_key = pdf[group_column].iloc[0]\n", + " y = pdf[y_column]\n", + " X = pdf[x_columns]\n", + " X = sm.add_constant(X)\n", + " model = sm.OLS(y, X).fit()\n", + " return pd.DataFrame(\n", + " [[group_key] + [model.params[i] for i in x_columns]],\n", + " columns=[group_column] + x_columns,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# run ols grouped by the \"id\" group column\n", + "beta = df2.groupby(group_column).apply(ols)\n", + "beta.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run Spark processing scripts locally\n", + "\n", + "You can also run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n", + "1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n", + "2. Create a preprocessing pipeline\n", + "3. Fit and transform the dataset\n", + "4. Split into a training and validation set\n", + "5. Save the files to local storage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.types import (\n", + " DoubleType,\n", + " StringType,\n", + " StructField,\n", + " StructType,\n", + ")\n", + "from pyspark.ml.feature import (\n", + " OneHotEncoder,\n", + " StringIndexer,\n", + " VectorAssembler,\n", + " VectorIndexer,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "schema = StructType(\n", + " [\n", + " StructField(\"sex\", StringType(), True),\n", + " StructField(\"length\", DoubleType(), True),\n", + " StructField(\"diameter\", DoubleType(), True),\n", + " StructField(\"height\", DoubleType(), True),\n", + " StructField(\"whole_weight\", DoubleType(), True),\n", + " StructField(\"shucked_weight\", DoubleType(), True),\n", + " StructField(\"viscera_weight\", DoubleType(), True),\n", + " StructField(\"shell_weight\", DoubleType(), True),\n", + " StructField(\"rings\", DoubleType(), True),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_uri = \"s3a://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv\"\n", + "\n", + "abalone_df = spark.read.csv(data_uri, header=False, schema=schema)\n", + "abalone_df.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# StringIndexer on the sex column which has categorical value\n", + "sex_indexer = StringIndexer(inputCol=\"sex\", outputCol=\"indexed_sex\")\n", + "\n", + "# one-hot encoding on the string-indexed sex column (indexed_sex)\n", + "sex_encoder = OneHotEncoder(inputCol=\"indexed_sex\", outputCol=\"sex_vec\")\n", + "\n", + "# vector-assembler will bring all the features to a 1D vector to save easily into CSV format\n", + "assembler = VectorAssembler(\n", + " inputCols=[\n", + " \"sex_vec\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + " ],\n", + " outputCol=\"features\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.ml import Pipeline\n", + "\n", + "pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])\n", + "model = pipeline.fit(abalone_df)\n", + "\n", + "# apply transforms to the data frame\n", + "transformed_df = model.transform(abalone_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# split into train and test set, and save to file\n", + "(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])\n", + "\n", + "\n", + "def csv_line(data):\n", + " r = \",\".join(str(d) for d in data[1])\n", + " print(r)\n", + " return str(data[0]) + \",\" + r\n", + "\n", + "\n", + "# save to file\n", + "train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))\n", + "train_lines = train_rdd.map(csv_line)\n", + "train_lines.saveAsTextFile(\"train-set\")\n", + "\n", + "val_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))\n", + "val_lines = val_rdd.map(csv_line)\n", + "val_lines.saveAsTextFile(\"test-set\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Print the first five rows of the preprocessed output file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "print(\"Top 5 rows from the train file\")\n", + "pd.read_csv(\"./train-set/part-00000\", header=None).head(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run the script as a SageMaker processing job\n", + "\n", + "Once experimentation is complete, you can run the script as a Sagemaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "!pygmentize ./code/preprocess.py" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We'll now use the `PySparkProcessor` class to define a Spark job and run it using SageMaker processing. For detailed reference, see [Data Processing with Spark](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#data-processing-with-spark)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import sagemaker.session\n", + "from time import strftime, gmtime\n", + "from sagemaker.spark.processing import PySparkProcessor\n", + "\n", + "sagemaker_logger = logging.getLogger(\"sagemaker\")\n", + "sagemaker_logger.setLevel(logging.INFO)\n", + "sagemaker_logger.addHandler(logging.StreamHandler())\n", + "\n", + "sagemaker_session = sagemaker.Session()\n", + "bucket = sagemaker_session.default_bucket()\n", + "role = sagemaker.get_execution_role()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# fetch the dataset from the SageMaker bucket\n", + "import boto3\n", + "\n", + "s3 = boto3.client(\"s3\")\n", + "s3.download_file(\n", + " f\"sagemaker-sample-files\", \"datasets/tabular/uci_abalone/abalone.csv\", \"abalone.csv\"\n", + ")\n", + "\n", + "# upload the raw input dataset to a unique S3 location\n", + "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", + "prefix = \"sagemaker/local-pyspark/{}\".format(timestamp_prefix)\n", + "input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n", + "input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n", + "\n", + "sagemaker_session.upload_data(\n", + " path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone\n", + ")\n", + "\n", + "# run the processing job\n", + "spark_processor = PySparkProcessor(\n", + " base_job_name=\"local-pyspark\",\n", + " framework_version=\"3.1\",\n", + " role=role,\n", + " instance_count=2,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " max_runtime_in_seconds=1200,\n", + ")\n", + "\n", + "spark_processor.run(\n", + " submit_app=\"./code/preprocess.py\",\n", + " arguments=[\n", + " \"--s3_input_bucket\",\n", + " bucket,\n", + " \"--s3_input_key_prefix\",\n", + " input_prefix_abalone,\n", + " \"--s3_output_bucket\",\n", + " bucket,\n", + " \"--s3_output_key_prefix\",\n", + " input_preprocessed_prefix_abalone,\n", + " ],\n", + " spark_event_logs_s3_uri=\"s3://{}/{}/spark_event_logs\".format(bucket, prefix),\n", + " logs=False,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inspect the first five rows of the preprocessed output file. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\n", + " \"Top 5 rows from s3://{}/{}/train/\".format(\n", + " bucket, input_preprocessed_prefix_abalone\n", + " )\n", + ")\n", + "!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}