From 822112a6aa4a951918f9a0a8a86029d42e7d3750 Mon Sep 17 00:00:00 2001 From: Bruno Pistone Date: Tue, 25 Oct 2022 18:40:07 +0200 Subject: [PATCH] [FEATURE] Add SageMaker Pipeline local mode example with BYOC and FrameworkProcessor (#3614) * added framework-processor-local-pipelines * black-np on notebook * updated README.md * solving problems for commit id fc80e0d * solved formatting problem in notebook * reviewed notebook content, added dataset description, download dataset ffrom public sagemaker s3 bucket * grammar check * changed dataset to synthetic transactions dataset * removed reference to dataset origin * updated to main branch * fixing grammar spell Co-authored-by: Aaron Markham --- README.md | 1 + .../framework-processor-byoc/code/README.md | 44 + .../code/build_image.sh | 48 + .../code/buildspec.sh | 47 + .../code/processing/processing.py | 137 +++ .../code/processing/requirements.txt | 1 + .../code/training/.dockerignore | 54 + .../code/training/Dockerfile | 25 + .../code/training/train.py | 151 +++ .../sagemaker-pipelines-local-mode-fp.ipynb | 1018 +++++++++++++++++ 10 files changed, 1526 insertions(+) create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/README.md create mode 100755 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/build_image.sh create mode 100755 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/buildspec.sh create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/processing.py create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/requirements.txt create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/.dockerignore create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/Dockerfile create mode 100755 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/train.py create mode 100644 sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/sagemaker-pipelines-local-mode-fp.ipynb diff --git a/README.md b/README.md index 15aacc10b7..5dcf120b2a 100644 --- a/README.md +++ b/README.md @@ -226,6 +226,7 @@ These examples show you how to use [SageMaker Pipelines](https://aws.amazon.com/ - [Amazon Comprehend with SageMaker Pipelines](sagemaker-pipelines/nlp/amazon_comprehend_sagemaker_pipeline) shows how to deploy a custom text classification using Amazon Comprehend and SageMaker Pipelines. - [Amazon Forecast with SageMaker Pipelines](sagemaker-pipelines/time_series_forecasting/amazon_forecast_pipeline) shows how you can create a dataset, dataset group and predictor with Amazon Forecast and SageMaker Pipelines. - [Multi-model SageMaker Pipeline with Hyperparamater Tuning and Experiments](sagemaker-pipeline-multi-model) shows how you can generate a regression model by training real estate data from Athena using Data Wrangler, and uses multiple algorithms both from a custom container and a SageMaker container in a single pipeline. +- [SageMaker Pipeline Local Mode with FrameworkProcessor and BYOC for PyTorch with sagemaker-training-toolkig](sagemaker-pipelines/tabular/local-mode/framework-processor-byoc) ### Amazon SageMaker Pre-Built Framework Containers and the Python SDK diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/README.md b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/README.md new file mode 100644 index 0000000000..81aeb12d42 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/README.md @@ -0,0 +1,44 @@ +## buildspec.sh + +This script allows you to create the source artifact to use for SageMaker Training Jobs or SageMaker Processing Jobs + +Parameters: +* ALGORITHM_NAME: Mandatory - Name of the algorithm you want to package +* S3_BUCKET_NAME: Optional - S3 bucket name where the package will be uploaded in the path s3:///artifact/ + +``` +.buildspec.sh +``` + +### Example: + +#### Processing + +``` +./buildspec.sh processing test-bucket +``` + +#### Training + +``` +./buildspec.sh training test-bucket +``` +## build_image.sh + +This script allows you to create a custom docker image and push on ECR + +Parameters: +* IMAGE_NAME: *Mandatory* - Name of the image you want to build +* REGISTRY_NAME: *Mandatory* - Name of the ECR repository you want to use for pushing the image +* IMAGE_TAG: *Mandatory* - Tag to apply to the ECR image +* DOCKER_FILE: *Mandatory* - Dockerfile to build +* PLATFORM: *Optional* - Target architecture chip where the image is executed +``` +./build_image.sh +``` + +Examples: + +``` +./build_image.sh training torch-1.12.1 latest Dockerfile linux/amd64 + diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/build_image.sh b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/build_image.sh new file mode 100755 index 0000000000..e532b7bc21 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/build_image.sh @@ -0,0 +1,48 @@ +#!/bin/sh + +# The name of our algorithm +repo=$1 +registry_name=$2 +image_tag=$3 +docker_file=$4 +platforms=$5 + +echo "[INFO]: registry_name=${registry_name}" +echo "[INFO]: image_tag=${image_tag}" +echo "[INFO]: docker_file=${docker_file}" +echo "[INFO]: platforms=${platforms}" + +cd $repo + +account=$(aws sts get-caller-identity --query Account --output text) + +# Get the region defined in the current configuration (default to us-west-2 if none defined) +region=$(aws configure get region) + +echo "[INFO]: Region ${region}" + +fullname="${account}.dkr.ecr.${region}.amazonaws.com/${registry_name}:${image_tag}" + +echo "[INFO]: Image name: ${fullname}" + +# If the repository doesn't exist in ECR, create it. + +aws ecr describe-repositories --repository-names "${registry_name}" > /dev/null 2>&1 + +aws ecr create-repository --repository-name "${registry_name}" > /dev/null + +## If you are extending Amazon SageMaker Images, you need to login to the account +# Get the login command from ECR and execute it directly +password=$(aws ecr --region ${region} get-login-password) + +docker login -u AWS -p ${password} "${account}.dkr.ecr.${region}.amazonaws.com" + +if [ -z ${platforms} ] +then + docker build -t ${fullname} -f ${docker_file} . +else + echo "Provided platform = ${platforms}" + docker build -t ${fullname} -f ${docker_file} . --platform=${platforms} +fi + +docker push ${fullname} \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/buildspec.sh b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/buildspec.sh new file mode 100755 index 0000000000..91a6b30148 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/buildspec.sh @@ -0,0 +1,47 @@ +# © 2021 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. +# +# This AWS Content is provided subject to the terms of the AWS Customer Agreement +# available at http://aws.amazon.com/agreement or other written agreement between +# Customer and either Amazon Web Services, Inc. or Amazon Web Services EMEA SARL or both. + +#!/bin/sh + +REPO=$1 +S3_BUCKET_NAME=$2 + +NAME=sourcedir +PUSH=true + +if [ -z ${REPO} ] ; +then + echo "Repository not specified" + exit 1 +fi + +SCRIPT=$(readlink -f "$0") +# Absolute path this script is in, thus /home/user/bin +SCRIPTPATH=$(dirname "$SCRIPT") + +cd $SCRIPTPATH/$REPO/src +tar --exclude='data' -czvf ${NAME}.tar.gz * + +rm -rf ../dist/$REPO +mkdir ../dist +mkdir ../dist/$REPO + +mv ${NAME}.tar.gz ../dist/$REPO + +if [ -z ${S3_BUCKET_NAME} ] ; +then + echo "S3 Bucket not specified, no upload" + PUSH=false +fi + +if $PUSH ; +then + echo "Uploading s3://${S3_BUCKET_NAME}/artifact/${REPO}/${NAME}.tar.gz" + + aws s3 cp ../dist/${REPO}/${NAME}.tar.gz s3://${S3_BUCKET_NAME}/artifact/${REPO}/${NAME}.tar.gz +else + exit 1 +fi \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/processing.py b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/processing.py new file mode 100644 index 0000000000..cc4452c533 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/processing.py @@ -0,0 +1,137 @@ +import argparse +import csv +import logging +import numpy as np +import os +from os import listdir +from os.path import isfile, join +import pandas as pd +from sklearn.model_selection import train_test_split +import traceback + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +BASE_PATH = os.path.join("/", "opt", "ml") +PROCESSING_PATH = os.path.join(BASE_PATH, "processing") +PROCESSING_PATH_INPUT = os.path.join(PROCESSING_PATH, "input") +PROCESSING_PATH_OUTPUT = os.path.join(PROCESSING_PATH, "output") + +def extract_data(file_path, percentage=100): + try: + files = [f for f in listdir(file_path) if isfile(join(file_path, f)) and f.endswith(".csv")] + LOGGER.info("{}".format(files)) + + frames = [] + + for file in files: + df = pd.read_csv( + os.path.join(file_path, file), + sep=",", + quotechar='"', + quoting=csv.QUOTE_ALL, + escapechar='\\', + encoding='utf-8', + error_bad_lines=False + ) + + df = df.head(int(len(df) * (percentage / 100))) + + frames.append(df) + + df = pd.concat(frames) + + return df + except Exception as e: + stacktrace = traceback.format_exc() + LOGGER.error("{}".format(stacktrace)) + + raise e + +def load_data(df, file_path, file_name): + try: + if not os.path.exists(file_path): + os.makedirs(file_path) + + path = os.path.join(file_path, file_name + ".csv") + + LOGGER.info("Saving file in {}".format(path)) + + df.to_csv( + path, + index=False, + header=True, + quoting=csv.QUOTE_ALL, + encoding="utf-8", + escapechar="\\", + sep="," + ) + except Exception as e: + stacktrace = traceback.format_exc() + LOGGER.error("{}".format(stacktrace)) + + raise e + +def transform_data(df): + try: + df = df[df['Is Fraud?'].notna()] + + df.insert(0, 'ID', range(1, len(df) + 1)) + + df["Errors?"].fillna('', inplace=True) + df['Errors?'] = df['Errors?'].map(lambda x: x.strip()) + df["Errors?"] = df["Errors?"].map({ + "Insufficient Balance": 0, + "Technical Glitch": 1, + "Bad PIN": 2, + "Bad Expiration": 3, + "Bad Card Number": 4, + "Bad CVV": 5, + "Bad PIN,Insufficient Balance": 6, + "Bad PIN,Technical Glitch": 7, + "": 8 + }) + + df["Use Chip"].fillna('', inplace=True) + df['Use Chip'] = df['Use Chip'].map(lambda x: x.strip()) + df["Use Chip"] = df["Use Chip"].map({ + "Swipe Transaction": 0, + "Chip Transaction": 1, + "Online Transaction": 2 + }) + + df['Is Fraud?'] = df['Is Fraud?'].map(lambda x: x.replace("'", "")) + df['Is Fraud?'] = df['Is Fraud?'].map(lambda x: x.strip()) + df['Is Fraud?'] = df['Is Fraud?'].replace('', np.nan) + df['Is Fraud?'] = df['Is Fraud?'].replace(' ', np.nan) + + df["Is Fraud?"] = df["Is Fraud?"].map({"No": 0, "Yes": 1}) + + df = df.rename( + columns={'Card': 'card', 'MCC': 'mcc', "Errors?": "errors", "Use Chip": "use_chip", "Is Fraud?": "labels"}) + + df = df[["card", "mcc", "errors", "use_chip", "labels"]] + + return df + + except Exception as e: + stacktrace = traceback.format_exc() + LOGGER.error("{}".format(stacktrace)) + + raise e + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("--dataset-percentage", type=int, required=False, default=100) + args = parser.parse_args() + + LOGGER.info("Arguments: {}".format(args)) + + df = extract_data(PROCESSING_PATH_INPUT, args.dataset_percentage) + + df = transform_data(df) + + data_train, data_test = train_test_split(df, test_size=0.2, shuffle=True) + + load_data(data_train, os.path.join(PROCESSING_PATH_OUTPUT, "train"), "train") + load_data(data_test, os.path.join(PROCESSING_PATH_OUTPUT, "test"), "test") diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/requirements.txt b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/requirements.txt new file mode 100644 index 0000000000..1411a4a0b5 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/processing/requirements.txt @@ -0,0 +1 @@ +pandas \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/.dockerignore b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/.dockerignore new file mode 100644 index 0000000000..451175d5ec --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/.dockerignore @@ -0,0 +1,54 @@ +.idea/ +.vscode/ +*/**/data/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Logs +**/logs +*.log + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/Dockerfile b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/Dockerfile new file mode 100644 index 0000000000..0b4cdb6c98 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/Dockerfile @@ -0,0 +1,25 @@ +FROM pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime + +ARG PYTHON=python3 +ARG PYTHON_PIP=python3-pip +ARG PIP=pip3 + +RUN apt-get update && apt-get install gcc -y + +RUN ${PIP} --no-cache-dir install --upgrade pip + +RUN ${PIP} install \ + pandas \ + scikit-learn + +WORKDIR / + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib" \ + PYTHONIOENCODING=UTF-8 \ + LANG=C.UTF-8 \ + LC_ALL=C.UTF-8 + +RUN ${PIP} install --no-cache --upgrade \ + sagemaker-pytorch-training \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/train.py b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/train.py new file mode 100755 index 0000000000..f30149ffca --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/code/training/train.py @@ -0,0 +1,151 @@ +from argparse import ArgumentParser +import csv +import glob +import logging +import os +import pandas as pd +from sklearn import preprocessing +import torch +from torch.utils.data import DataLoader, TensorDataset +import traceback + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +device = "cuda" if torch.cuda.is_available() else "cpu" + +""" + Read input data +""" +def __read_data(files_path): + try: + logger.info("Reading dataset from source...") + + all_files = glob.glob(os.path.join(files_path, "*.csv")) + + datasets = [] + + for filename in all_files: + data = pd.read_csv( + filename, + sep=',', + quotechar='"', + quoting=csv.QUOTE_ALL, + escapechar='\\', + encoding='utf-8', + error_bad_lines=False + ) + + datasets.append(data) + + data = pd.concat(datasets, axis=0, ignore_index=True) + + data = data.dropna() + + return data + except Exception as e: + stacktrace = traceback.format_exc() + logger.error("{}".format(stacktrace)) + + raise e + +def prepare_data(train, test): + try: + X_train, y_train = train.iloc[:, train.columns != 'labels'], train.iloc[:, train.columns == 'labels'] + X_test, y_test = test.iloc[:, test.columns != 'labels'], test.iloc[:, train.columns == 'labels'] + + y_test = y_test.astype("int64") + + scaler = preprocessing.MinMaxScaler() + + X_train = scaler.fit_transform(X_train.values) + X_test = scaler.fit_transform(X_test.values) + + X_train_tensor = torch.from_numpy(X_train) + y_train_tensor = torch.from_numpy(y_train.values.ravel()).float() + y_train_tensor = y_train_tensor.unsqueeze(1) + + X_test_tensor = torch.from_numpy(X_test) + y_test_tensor = torch.from_numpy(y_test.values.ravel()).float() + y_test_tensor = y_test_tensor.unsqueeze(1) + + train_ds = TensorDataset(X_train_tensor, y_train_tensor) + test_ds = TensorDataset(X_test_tensor, y_test_tensor) + + train_dl = DataLoader(train_ds, batch_size=args.batch_size) + test_dl = DataLoader(test_ds, batch_size=32) + + return train_dl, test_dl + except Exception as e: + stacktrace = traceback.format_exc() + logger.error("{}".format(stacktrace)) + + raise e + +class BinaryClassifierModel(torch.nn.Module): + def __init__(self, shape): + super(BinaryClassifierModel, self).__init__() + + self.d1 = torch.nn.Linear(shape, 32) + self.d2 = torch.nn.Linear(32, 64) + self.drop = torch.nn.Dropout(0.2) + self.output = torch.nn.Linear(64, 1) + + def forward(self, x): + x = torch.relu(self.d1(x)) + x = torch.relu(self.d2(x)) + x = self.drop(x) + x = torch.sigmoid(self.output(x)) + + return x + +if __name__ == '__main__': + + parser = ArgumentParser() + + parser.add_argument('--epochs', type=int, default=10) + parser.add_argument('--learning_rate', type=float, default=1.45e-4) + parser.add_argument('--batch_size', type=int, default=100) + parser.add_argument('--output-data-dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR')) + parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN')) + parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST')) + parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR')) + + args = parser.parse_args() + + train = __read_data(args.train) + test = __read_data(args.test) + + train_dl, test_dl = prepare_data(train, test) + + model = BinaryClassifierModel(train.shape[1] - 1) + + model = model.to(device) + + optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate) + + loss_obj = torch.nn.BCELoss() + + model.train() + train_loss = [] + + for epoch in range(args.epochs): + logger.info("Epoch {}".format(epoch + 1)) + + # Within each epoch run the subsets of data = batch sizes. + for xb, yb in train_dl: + xb = xb.to(device) + yb = yb.to(device) + + y_pred = model(xb.float()) # Forward Propagation + + loss = loss_obj(y_pred, yb) # Loss Computation + + optimizer.zero_grad() # Clearing all previous gradients, setting to zero + loss.backward() # Back Propagation + optimizer.step() # Updating the parameters + + logger.info("Training Loss: {}".format(loss.item())) + train_loss.append(loss.item()) + + torch.save(model.cpu(), os.path.join(args.model_dir, "model.pth")) \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/sagemaker-pipelines-local-mode-fp.ipynb b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/sagemaker-pipelines-local-mode-fp.ipynb new file mode 100644 index 0000000000..6dd39574d7 --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/framework-processor-byoc/sagemaker-pipelines-local-mode-fp.ipynb @@ -0,0 +1,1018 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "# SageMaker Pipeline - Local Mode\n", + "\n", + "This notebook demonstrates how to orchestrate SageMaker jobs locally using SageMaker Pipelines.\n", + "\n", + "Using a `LocalPipelineSession` object, you can now run your pipelines on your local machine before running them in the cloud.\n", + "\n", + "The `LocalPipelineSession` object is used while defining each pipeline step and when defining the complete Pipeline object. To run this pipeline in the cloud, each step along with the Pipeline object must be redefined using `PipelineSession`.\n", + "\n", + "Note: This notebook will not run in SageMaker Studio. You can run this on SageMaker Classic Notebook instances OR your local IDE." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "In this notebook, we will execute in our local environment a pipeline that will perform the following steps:\n", + "\n", + "* ProcessingStep by using `FrameworkProcessor`\n", + "* TrainingStep by using `Estimator` with a custom PyTorch container" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Dataset\n", + "\n", + "We are using a subset of ~20000 records of synthetic transactions, each of which is labeled as fraudulent or not fraudulent.\n", + "We'd like to train a model based on the features of these transactions so that we can predict risky or fraudulent transactions in the future.\n", + "\n", + "This is a binary classification problem:\n", + "\n", + "* 1 - Fraud\n", + "* 0 - No Fraud" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "! rm -rf ./data && mkdir -p ./data\n", + "\n", + "! aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic_credit_card_transactions/user0_credit_card_transactions.csv ./data/creditcard_csv.csv" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Prerequisites\n", + "\n", + "Install the latest version of the SageMaker Python SDK" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "! pip install 'sagemaker' --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Build Container\n", + "\n", + "In order to use Amazon SageMaker Training Job with a custom image, the first step is to build it and push in a private [Amazon ECR Repository](https://docs.aws.amazon.com/en_en/AmazonECR/latest/userguide/what-is-ecr.html).\n", + "\n", + "The Dockerfile defined is creating starting from the public [torch 1.12.1 image](https://hub.docker.com/layers/pytorch/pytorch/1.12.1-cuda11.3-cudnn8-runtime/images/sha256-0bc0971dc8ae319af610d493aced87df46255c9508a8b9e9bc365f11a56e7b75?context=explore), and by the usage of\n", + "[sagemaker-training-toolkit](https://github.com/aws/sagemaker-training-toolkit) we are making our container compatible with Amazon SageMaker for providing our training script during the definition of the `Estimator`.\n", + "\n", + "For facilitating the steps of building the Docker Image and push it in the Amazon ECR Repository, we are providing a utility script [build_image.sh](./code/build_image.sh).\n", + "\n", + "For more information on the usage, please read the [README](./code/README.md)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "! pygmentize ./code/training/Dockerfile" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Part 1/3 - Setup\n", + "\n", + "Here we'll import some libraries and define some variables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "import boto3\n", + "import json\n", + "import logging\n", + "import sagemaker\n", + "from sagemaker.estimator import Estimator\n", + "from sagemaker.inputs import TrainingInput\n", + "from sagemaker.workflow.pipeline_context import LocalPipelineSession\n", + "from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput\n", + "from sagemaker.sklearn.estimator import SKLearn\n", + "from sagemaker.workflow.parameters import ParameterString\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "from sagemaker.workflow.steps import ProcessingStep, TrainingStep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "logging.basicConfig(level=logging.INFO)\n", + "LOGGER = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "s3_client = boto3.client(\"s3\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "Create `LocalPipelineSession` object so that each pipeline step will run locally.\n", + "\n", + "To run this pipeline in the cloud, you must change `LocalPipelineSession()` to `PipelineSession()`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "sagemaker_session = LocalPipelineSession()\n", + "\n", + "region = sagemaker_session.boto_region_name\n", + "\n", + "default_bucket = sagemaker_session.default_bucket()\n", + "\n", + "role = None" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Please Note: Provide SageMaker Execution Role ARN if not running on SageMaker Notebook environment" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "
💡 \n", + " Set Execution Role for Permissions \n", + " If you are running this notebook from a local machine, as opposed to within the SageMaker Jupyter environment, you will need to add the code below, after filling in the name for a valid SageMaker Execution Role.\n", + "

\n", + " \n", + " \n", + " Click here to lookup IAM SageMaker Execution Roles\n", + " \n", + " The except block below will look up the ARN from the role name.\n", + " \n", + "

\n", + "
" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "if role == None:\n", + " role = sagemaker.get_execution_role()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Upload Dataset in the Default Amazon S3 Bucket\n", + "\n", + "In order to make the data available, we are uploading the downloaded dataset into the default S3 bucket" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "s3_client.delete_object(Bucket=default_bucket, Key=\"sg-pipeline-local/data/input\")\n", + "\n", + "input_data = sagemaker_session.upload_data(\n", + " \"./data/creditcard_csv.csv\", key_prefix=\"sg-pipeline-local/data/input\"\n", + ")\n", + "\n", + "input_data" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Part 2/3 - Create Amazon SageMaker Pipeline\n", + "\n", + "In this section, we are creating the Amazon SageMaker Pipeline and define the proper Input Parameters for making it usable for both local mode and for cloud executions" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Compress source code for installing additional python modules\n", + "\n", + "By using [sagemaker-training-toolkit](https://github.com/aws/sagemaker-training-toolkit), we can provide the execution scripts and the requirements.txt for installing additional dependencies to the `Estimator` that we will define some steps below.\n", + "\n", + "In order to make sure that Amazon SageMaker will install our additional Python modules by reading `requirements.txt`, we are compressing the content of the [training](./code/training) folder and uploading it in the default S3 Bucket." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "! cd ./code/training && rm -rf ./../dist/training && mkdir -p ./../dist/training && tar --exclude='Dockerfile' --exclude='.dockerignore' -czvf ./../dist/training/sourcedir.tar.gz *" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Download the\n", + "# clean the buckets first\n", + "s3_client.delete_object(Bucket=default_bucket, Key=\"sg-pipeline-local/rtifact/training\")\n", + "\n", + "code_path = sagemaker_session.upload_data(\n", + " \"./code/dist/training/sourcedir.tar.gz\", key_prefix=\"sg-pipeline-local/artifact/training\"\n", + ")\n", + "\n", + "code_path" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "By using `FrameworkProcessor`, we can provide to the Amazon SageMaker Job the execution scripts and the requirements.txt for installing additional Python modules. Look at the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) for additional info.\n", + "\n", + "In order to make sure that Amazon SageMaker will install our additional Python modules by reading `requirements.txt`, we are compressing the content of the [processing](./code/processing) folder and uploading it in the default S3 Bucket." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "! cd ./code/processing && rm -rf ./../dist/processing && mkdir -p ./../dist/processing && tar -czvf ./../dist/processing/sourcedir.tar.gz *" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Download the\n", + "# clean the buckets first\n", + "s3_client.delete_object(Bucket=default_bucket, Key=\"sg-pipeline-local/artifact/processing\")\n", + "\n", + "code_path = sagemaker_session.upload_data(\n", + " \"./code/dist/processing/sourcedir.tar.gz\", key_prefix=\"sg-pipeline-local/artifact/processing\"\n", + ")\n", + "\n", + "code_path" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Global Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "processing_artifact_path = \"sg-pipeline-local/artifact/processing\"\n", + "processing_artifact_name = \"sourcedir.tar.gz\"\n", + "processing_framework_version = \"0.23-1\"\n", + "processing_instance_count = 1\n", + "processing_input_files_path = \"sg-pipeline-local/data/input\"\n", + "processing_output_files_path = \"sg-pipeline-local/data/output\"\n", + "\n", + "training_image_name = \"torch-1.12.1\"\n", + "training_image_version = \"latest\"\n", + "training_artifact_path = \"sg-pipeline-local/artifact/training\"\n", + "training_artifact_name = \"sourcedir.tar.gz\"\n", + "training_output_files_path = \"sg-pipeline-local/models\"\n", + "training_python_version = \"py37\"\n", + "training_instance_count = 1\n", + "training_hyperparameters = {\"epochs\": 6, \"learning_rate\": 1.34e-4, \"batch_size\": 100}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Pipeline Parameters\n", + "\n", + "In order to make the Amazon SageMaker Pipeline available for executing it both in `local mode` and in the cloud, we are defining the following `ParameterString` for providing the execution type at runtime" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "processing_instance_type = ParameterString(\n", + " name=\"ProcessingInstanceType\", default_value=\"ml.t3.large\"\n", + ")\n", + "\n", + "training_instance_type = ParameterString(name=\"TrainingInstanceType\", default_value=\"ml.m5.large\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "#### SageMaker Processing Step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "processing_inputs = [\n", + " ProcessingInput(\n", + " source=\"s3://{}/{}\".format(default_bucket, processing_input_files_path),\n", + " destination=\"/opt/ml/processing/input\",\n", + " )\n", + "]\n", + "\n", + "processing_outputs = [\n", + " ProcessingOutput(\n", + " output_name=\"output\",\n", + " source=\"/opt/ml/processing/output\",\n", + " destination=\"s3://{}/{}\".format(default_bucket, processing_output_files_path),\n", + " )\n", + "]\n", + "\n", + "processing_source_dir = \"s3://{}/{}/{}\".format(\n", + " default_bucket, processing_artifact_path, processing_artifact_name\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "Define the `FrameworkProcessor` object" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "processor = FrameworkProcessor(\n", + " estimator_cls=SKLearn,\n", + " framework_version=processing_framework_version,\n", + " role=role,\n", + " instance_count=processing_instance_count,\n", + " instance_type=processing_instance_type,\n", + " sagemaker_session=sagemaker_session,\n", + ")\n", + "\n", + "run_args = processor.get_run_args(\n", + " \"processing.py\",\n", + " source_dir=processing_source_dir,\n", + " inputs=processing_inputs,\n", + " outputs=processing_outputs,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "step_process = ProcessingStep(\n", + " name=\"ProcessData\",\n", + " code=run_args.code,\n", + " processor=processor,\n", + " inputs=run_args.inputs,\n", + " outputs=run_args.outputs,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "#### SageMaker Training Step" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "#### Utility methods\n", + "\n", + "For providing the compressed `sourcedir` to `Estimator`, we are defining a utility method for encoding the job `hyperparameters`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "def json_encode_hyperparameters(hyperparameters):\n", + " return {str(k): json.dumps(v) for (k, v) in hyperparameters.items()}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "source_dir = \"s3://{}/{}/{}\".format(default_bucket, training_artifact_path, training_artifact_name)\n", + "\n", + "training_hyperparameters[\"sagemaker_program\"] = \"train.py\"\n", + "training_hyperparameters[\"sagemaker_submit_directory\"] = source_dir\n", + "\n", + "training_hyperparameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "training_input = TrainingInput(\n", + " s3_data=\"s3://{}/{}/train\".format(default_bucket, processing_output_files_path),\n", + " content_type=\"text/csv\",\n", + ")\n", + "\n", + "test_input = TrainingInput(\n", + " s3_data=\"s3://{}/{}/test\".format(default_bucket, processing_output_files_path),\n", + " content_type=\"text/csv\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "output_path = \"s3://{}/{}\".format(default_bucket, training_output_files_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "##### Get ECR image uri\n", + "\n", + "Let's take the `image_uri` related to our custom image we want to use for our training job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "container = \"{}.dkr.ecr.{}.amazonaws.com/{}:{}\".format(\n", + " boto3.client(\"sts\").get_caller_identity().get(\"Account\"),\n", + " boto3.session.Session().region_name,\n", + " training_image_name,\n", + " training_image_version,\n", + ")\n", + "\n", + "print(container)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "Define the `Estimator` object" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "estimator = Estimator(\n", + " image_uri=container,\n", + " output_path=output_path,\n", + " hyperparameters=json_encode_hyperparameters(training_hyperparameters),\n", + " enable_sagemaker_metrics=True,\n", + " role=role,\n", + " instance_count=training_instance_count,\n", + " instance_type=training_instance_type,\n", + " disable_profiler=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "step_train = TrainingStep(\n", + " depends_on=[step_process],\n", + " name=\"TrainModel\",\n", + " estimator=estimator,\n", + " inputs={\"train\": training_input, \"test\": test_input},\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "#### Pipeline definition\n", + "\n", + "Let's create the pipeline object, which contains as `parameters` the inputs defined in the previous sections, and as steps the `ProcessingStep` and `TrainingStep` defined few cells above" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "pipeline = Pipeline(\n", + " name=\"FraudTrainingPipeline\",\n", + " parameters=[processing_instance_type, training_instance_type],\n", + " steps=[step_process, step_train],\n", + " sagemaker_session=sagemaker_session,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "pipeline.upsert(role_arn=role)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "json.loads(pipeline.definition())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "***" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Part 3/3 - Run SageMaker Pipeline\n", + "\n", + "For executing the Amazon SageMaker Pipeline in our local environment, we are providing for both the `ProcessingStep` and `TrainingStep` the parameter `local` for the `instance_type` to use." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "execution = pipeline.start(\n", + " parameters={\"ProcessingInstanceType\": \"local\", \"TrainingInstanceType\": \"local\"}\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "execution.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "execution.list_steps()" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3.10.6 64-bit", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.6" + }, + "vscode": { + "interpreter": { + "hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}