Skip to content

Commit

Permalink
Pipeline Step Caching Example Notebook (aws#3638)
Browse files Browse the repository at this point in the history
* feature: pipeline caching notebook example

* change: initialize notebook

* feature: pipeline caching notebook example and tuning notebook adjustment

* fix: example notebook

* change: README

* fix: notebook code

* fix: grammar

* fix: more grammar

* fix: pr syntax and remove dataset

* fix: updated paths

* fix: tuning notebook formatting

* fix: more path corrections

Co-authored-by: Brock Wade <[email protected]>
  • Loading branch information
2 people authored and atqy committed Oct 28, 2022
1 parent 318bdf8 commit 6cfd848
Show file tree
Hide file tree
Showing 5 changed files with 1,134 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ These examples show you how to use [SageMaker Pipelines](https://aws.amazon.com/
- [Amazon Forecast with SageMaker Pipelines](sagemaker-pipelines/time_series_forecasting/amazon_forecast_pipeline) shows how you can create a dataset, dataset group and predictor with Amazon Forecast and SageMaker Pipelines.
- [Multi-model SageMaker Pipeline with Hyperparamater Tuning and Experiments](sagemaker-pipeline-multi-model) shows how you can generate a regression model by training real estate data from Athena using Data Wrangler, and uses multiple algorithms both from a custom container and a SageMaker container in a single pipeline.
- [SageMaker Pipeline Local Mode with FrameworkProcessor and BYOC for PyTorch with sagemaker-training-toolkig](sagemaker-pipelines/tabular/local-mode/framework-processor-byoc)
- [SageMaker Pipeline Step Caching](sagemaker-pipelines/tabular/caching) shows how you can leverage pipeline step caching while building pipelines and shows expected cache hit / cache miss behavior.

### Amazon SageMaker Pre-Built Framework Containers and the Python SDK

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
"sex",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
"sex": str,
"length": np.float64,
"diameter": np.float64,
"height": np.float64,
"whole_weight": np.float64,
"shucked_weight": np.float64,
"viscera_weight": np.float64,
"shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
z = x.copy()
z.update(y)
return z


if __name__ == "__main__":
base_dir = "/opt/ml/processing"

df = pd.read_csv(
f"{base_dir}/input/abalone-dataset.csv",
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
)
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_features = ["sex"]
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)

preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)

y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)

X = np.concatenate((y_pre, X_pre), axis=1)

np.random.shuffle(X)
train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
pd.DataFrame(validation).to_csv(
f"{base_dir}/validation/validation.csv", header=False, index=False
)
pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
"sex",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
"sex": str,
"length": np.float64,
"diameter": np.float64,
"height": np.float64,
"whole_weight": np.float64,
"shucked_weight": np.float64,
"viscera_weight": np.float64,
"shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
z = x.copy()
z.update(y)
return z


if __name__ == "__main__":
base_dir = "/opt/ml/processing"

df = pd.read_csv(
f"{base_dir}/input/abalone-dataset.csv",
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
)
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_features = ["sex"]
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)

preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)

y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)

X = np.concatenate((y_pre, X_pre), axis=1)

np.random.shuffle(X)
train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
pd.DataFrame(validation).to_csv(
f"{base_dir}/validation/validation.csv", header=False, index=False
)
pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

def additional_method():
"""this is an additional method in order to update this file's hash"""
pass
Loading

0 comments on commit 6cfd848

Please sign in to comment.