Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local pyspark example #3421

Merged
merged 9 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sagemaker_processing/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ Processing
fairness_and_explainability/fairness_and_explainability_spark
fairness_and_explainability/text_explainability/text_explainability
fairness_and_explainability/text_explainability_sagemaker_algorithm/byo_blazingtext_model_hosting
computer_vision/explainability_image_classification
computer_vision/explainability_image_classification
local_pyspark/local_pyspark_example
107 changes: 107 additions & 0 deletions sagemaker_processing/local_pyspark/code/preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
OneHotEncoder,
StringIndexer,
VectorAssembler,
VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
DoubleType,
StringType,
StructField,
StructType,
)
from pyspark.ml.functions import vector_to_array


def main():
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
args = parser.parse_args()

spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

# Defining the schema corresponding to the input data. The input data does not contain headers
schema = StructType(
[
StructField("sex", StringType(), True),
StructField("length", DoubleType(), True),
StructField("diameter", DoubleType(), True),
StructField("height", DoubleType(), True),
StructField("whole_weight", DoubleType(), True),
StructField("shucked_weight", DoubleType(), True),
StructField("viscera_weight", DoubleType(), True),
StructField("shell_weight", DoubleType(), True),
StructField("rings", DoubleType(), True),
]
)

# Downloading the data from S3 into a Dataframe
total_df = spark.read.csv(
("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
header=False,
schema=schema,
)

# StringIndexer on the sex column which has categorical value
sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

# one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

# vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
assembler = VectorAssembler(
inputCols=[
"sex_vec",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
],
outputCol="features",
)

# add the above steps to a pipeline
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

# train the feature transformers
model = pipeline.fit(total_df)

# transform the dataset with information obtained from the previous fit
transformed_total_df = model.transform(total_df)

# split the overall dataset into 80-20 training and validation
(train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

# extract only rings and features columns to write to csv
train_df_final = train_df.withColumn("feature", vector_to_array("features")).select(
["rings"] + [col("feature")[i] for i in range(9)]
)

val_df_final = validation_df.withColumn("feature", vector_to_array("features")).select(
["rings"] + [col("feature")[i] for i in range(9)]
)

# write to csv files in S3
train_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/train")
val_df_final.write.csv(f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/validation")


if __name__ == "__main__":
main()
Loading