From 50c356fc339f8b6c0a5b004bee4b05d0fb0137d4 Mon Sep 17 00:00:00 2001 From: Kirit Thadaka Date: Fri, 6 Aug 2021 15:11:38 -0400 Subject: [PATCH] Added lambda step example (#2847) * Added lambda step example * Updated notes on IAM policy for Lambda * Updated IAM notes * Added pip install * Updated pip install * Added iam helper function * Updated role description * Fixed typos * Updated md cells * updated formatting Co-authored-by: kthadaka --- .../tabular/lambda-step/iam_helper.py | 42 + .../sagemaker-pipelines-lambda-step.ipynb | 891 ++++++++++++++++++ 2 files changed, 933 insertions(+) create mode 100644 sagemaker-pipelines/tabular/lambda-step/iam_helper.py create mode 100644 sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb diff --git a/sagemaker-pipelines/tabular/lambda-step/iam_helper.py b/sagemaker-pipelines/tabular/lambda-step/iam_helper.py new file mode 100644 index 0000000000..51ff704ff0 --- /dev/null +++ b/sagemaker-pipelines/tabular/lambda-step/iam_helper.py @@ -0,0 +1,42 @@ +import boto3 +import json + +iam = boto3.client('iam') + +def create_lambda_role(role_name): + try: + response = iam.create_role( + RoleName = role_name, + AssumeRolePolicyDocument = json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + }), + Description='Role for Lambda to call SageMaker functions' + ) + + role_arn = response['Role']['Arn'] + + response = iam.attach_role_policy( + RoleName=role_name, + PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' + ) + + response = iam.attach_role_policy( + PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess', + RoleName=role_name + ) + + return role_arn + + except iam.exceptions.EntityAlreadyExistsException: + print(f'Using ARN from existing role: {role_name}') + response = iam.get_role(RoleName=role_name) + return response['Role']['Arn'] \ No newline at end of file diff --git a/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb b/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb new file mode 100644 index 0000000000..69df9cf634 --- /dev/null +++ b/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb @@ -0,0 +1,891 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### SageMaker Pipelines Lambda Step\n", + "\n", + "This notebook illustrates how a Lambda function can be run as a step in a SageMaker Pipeline. \n", + "\n", + "The steps in this pipeline include -\n", + "* Preprocessing the abalone dataset\n", + "* Train an XGBoost Model\n", + "* Evaluate the model performance\n", + "* Create a model\n", + "* Deploy the model to a SageMaker Hosted Endpoint using a Lambda Function\n", + "\n", + "A step to register the model into a Model Registry can be added to the pipeline using the `RegisterModel` step." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Prerequisites\n", + "\n", + "The notebook execution role should have policies which enable the notebook to create a Lambda function. The Amazon managed policy `AmazonSageMakerPipelinesIntegrations` can be added to the notebook execution role. \n", + "\n", + "The policy description is -\n", + "\n", + "```\n", + "\n", + "{\n", + " \"Version\": \"2012-10-17\",\n", + " \"Statement\": [\n", + " {\n", + " \"Effect\": \"Allow\",\n", + " \"Action\": [\n", + " \"lambda:CreateFunction\",\n", + " \"lambda:DeleteFunction\",\n", + " \"lambda:InvokeFunction\",\n", + " \"lambda:UpdateFunctionCode\"\n", + " ],\n", + " \"Resource\": [\n", + " \"arn:aws:lambda:*:*:function:*sagemaker*\",\n", + " \"arn:aws:lambda:*:*:function:*sageMaker*\",\n", + " \"arn:aws:lambda:*:*:function:*SageMaker*\"\n", + " ]\n", + " },\n", + " {\n", + " \"Effect\": \"Allow\",\n", + " \"Action\": [\n", + " \"sqs:CreateQueue\",\n", + " \"sqs:SendMessage\"\n", + " ],\n", + " \"Resource\": [\n", + " \"arn:aws:sqs:*:*:*sagemaker*\",\n", + " \"arn:aws:sqs:*:*:*sageMaker*\",\n", + " \"arn:aws:sqs:*:*:*SageMaker*\"\n", + " ]\n", + " },\n", + " {\n", + " \"Effect\": \"Allow\",\n", + " \"Action\": [\n", + " \"iam:PassRole\"\n", + " ],\n", + " \"Resource\": \"arn:aws:iam::*:role/*\",\n", + " \"Condition\": {\n", + " \"StringEquals\": {\n", + " \"iam:PassedToService\": [\n", + " \"lambda.amazonaws.com\"\n", + " ]\n", + " }\n", + " }\n", + " }\n", + " ]\n", + "}\n", + " \n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "\n", + "!{sys.executable} -m pip install \"sagemaker>=2.51.0\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import time\n", + "import boto3\n", + "import sagemaker\n", + "\n", + "from sagemaker.estimator import Estimator\n", + "from sagemaker.inputs import TrainingInput\n", + "\n", + "from sagemaker.processing import (\n", + " ProcessingInput,\n", + " ProcessingOutput,\n", + " Processor,\n", + " ScriptProcessor,\n", + ")\n", + "\n", + "from sagemaker import Model\n", + "from sagemaker.xgboost import XGBoostPredictor\n", + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "\n", + "from sagemaker.workflow.parameters import (\n", + " ParameterInteger,\n", + " ParameterString,\n", + ")\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "from sagemaker.workflow.properties import PropertyFile\n", + "from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig\n", + "from sagemaker.workflow.lambda_step import (\n", + " LambdaStep,\n", + " LambdaOutput,\n", + " LambdaOutputTypeEnum,\n", + ")\n", + "from sagemaker.workflow.step_collections import CreateModelStep\n", + "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", + "from sagemaker.workflow.condition_step import (\n", + " ConditionStep,\n", + " JsonGet,\n", + ")\n", + "\n", + "from sagemaker.lambda_helper import Lambda" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the SageMaker Session\n", + "\n", + "region = sagemaker.Session().boto_region_name\n", + "sm_client = boto3.client(\"sagemaker\")\n", + "boto_session = boto3.Session(region_name=region)\n", + "sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)\n", + "prefix = \"lambda-step-pipeline\"\n", + "\n", + "account_id = boto3.client(\"sts\").get_caller_identity().get(\"Account\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define variables and parameters needed for the Pipeline steps\n", + "\n", + "role = sagemaker.get_execution_role()\n", + "default_bucket = sagemaker_session.default_bucket()\n", + "base_job_prefix = \"lambda-step-example\"\n", + "s3_prefix = \"lambda-step-pipeline\"\n", + "\n", + "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", + "processing_instance_type = ParameterString(\n", + " name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\"\n", + ")\n", + "training_instance_type = ParameterString(name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\")\n", + "model_approval_status = ParameterString(\n", + " name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\"\n", + ")\n", + "input_data = ParameterString(\n", + " name=\"InputDataUrl\",\n", + " default_value=f\"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv\",\n", + ")\n", + "model_approval_status = ParameterString(\n", + " name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\"\n", + ")\n", + "\n", + "# Cache Pipeline steps to reduce execution time on subsequent executions\n", + "cache_config = CacheConfig(enable_caching=True, expire_after=\"30d\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Data Preparation\n", + "\n", + "An SKLearn processor is used to prepare the dataset for the Hyperparameter Tuning job. Using the script `preprocess.py`, the dataset is featurized and split into train, test, and validation datasets. \n", + "\n", + "The output of this step is used as the input to the TrainingStep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile preprocess.py\n", + "\n", + "\"\"\"Feature engineers the abalone dataset.\"\"\"\n", + "import argparse\n", + "import logging\n", + "import os\n", + "import pathlib\n", + "import requests\n", + "import tempfile\n", + "\n", + "import boto3\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from sklearn.compose import ColumnTransformer\n", + "from sklearn.impute import SimpleImputer\n", + "from sklearn.pipeline import Pipeline\n", + "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", + "\n", + "logger = logging.getLogger()\n", + "logger.setLevel(logging.INFO)\n", + "logger.addHandler(logging.StreamHandler())\n", + "\n", + "\n", + "# Since we get a headerless CSV file we specify the column names here.\n", + "feature_columns_names = [\n", + " \"sex\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + "]\n", + "label_column = \"rings\"\n", + "\n", + "feature_columns_dtype = {\n", + " \"sex\": str,\n", + " \"length\": np.float64,\n", + " \"diameter\": np.float64,\n", + " \"height\": np.float64,\n", + " \"whole_weight\": np.float64,\n", + " \"shucked_weight\": np.float64,\n", + " \"viscera_weight\": np.float64,\n", + " \"shell_weight\": np.float64,\n", + "}\n", + "label_column_dtype = {\"rings\": np.float64}\n", + "\n", + "\n", + "def merge_two_dicts(x, y):\n", + " \"\"\"Merges two dicts, returning a new copy.\"\"\"\n", + " z = x.copy()\n", + " z.update(y)\n", + " return z\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " logger.debug(\"Starting preprocessing.\")\n", + " parser = argparse.ArgumentParser()\n", + " parser.add_argument(\"--input-data\", type=str, required=True)\n", + " args = parser.parse_args()\n", + "\n", + " base_dir = \"/opt/ml/processing\"\n", + " pathlib.Path(f\"{base_dir}/data\").mkdir(parents=True, exist_ok=True)\n", + " input_data = args.input_data\n", + " bucket = input_data.split(\"/\")[2]\n", + " key = \"/\".join(input_data.split(\"/\")[3:])\n", + "\n", + " logger.info(\"Downloading data from bucket: %s, key: %s\", bucket, key)\n", + " fn = f\"{base_dir}/data/abalone-dataset.csv\"\n", + " s3 = boto3.resource(\"s3\")\n", + " s3.Bucket(bucket).download_file(key, fn)\n", + "\n", + " logger.debug(\"Reading downloaded data.\")\n", + " df = pd.read_csv(\n", + " fn,\n", + " header=None,\n", + " names=feature_columns_names + [label_column],\n", + " dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),\n", + " )\n", + " os.unlink(fn)\n", + "\n", + " logger.debug(\"Defining transformers.\")\n", + " numeric_features = list(feature_columns_names)\n", + " numeric_features.remove(\"sex\")\n", + " numeric_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", + " (\"scaler\", StandardScaler()),\n", + " ]\n", + " )\n", + "\n", + " categorical_features = [\"sex\"]\n", + " categorical_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", + " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\")),\n", + " ]\n", + " )\n", + "\n", + " preprocess = ColumnTransformer(\n", + " transformers=[\n", + " (\"num\", numeric_transformer, numeric_features),\n", + " (\"cat\", categorical_transformer, categorical_features),\n", + " ]\n", + " )\n", + "\n", + " logger.info(\"Applying transforms.\")\n", + " y = df.pop(\"rings\")\n", + " X_pre = preprocess.fit_transform(df)\n", + " y_pre = y.to_numpy().reshape(len(y), 1)\n", + "\n", + " X = np.concatenate((y_pre, X_pre), axis=1)\n", + "\n", + " logger.info(\"Splitting %d rows of data into train, validation, test datasets.\", len(X))\n", + " np.random.shuffle(X)\n", + " train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])\n", + "\n", + " logger.info(\"Writing out datasets to %s.\", base_dir)\n", + " pd.DataFrame(train).to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", + " pd.DataFrame(validation).to_csv(\n", + " f\"{base_dir}/validation/validation.csv\", header=False, index=False\n", + " )\n", + " pd.DataFrame(test).to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Process the training data step using a python script.\n", + "# Split the training data set into train, test, and validation datasets\n", + "\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=\"0.23-1\",\n", + " instance_type=processing_instance_type,\n", + " instance_count=processing_instance_count,\n", + " base_job_name=f\"{base_job_prefix}/sklearn-abalone-preprocess\",\n", + " sagemaker_session=sagemaker_session,\n", + " role=role,\n", + ")\n", + "step_process = ProcessingStep(\n", + " name=\"PreprocessAbaloneData\",\n", + " processor=sklearn_processor,\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", + " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", + " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\"),\n", + " ],\n", + " code=\"preprocess.py\",\n", + " job_arguments=[\"--input-data\", input_data],\n", + " cache_config=cache_config,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model Training\n", + "\n", + "Train an XGBoost model with the output of the ProcessingStep." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define the output path for the model artifacts from the Hyperparameter Tuning Job\n", + "model_path = f\"s3://{default_bucket}/{base_job_prefix}/AbaloneTrain\"\n", + "\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=training_instance_type,\n", + ")\n", + "\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " instance_type=training_instance_type,\n", + " instance_count=1,\n", + " output_path=model_path,\n", + " base_job_name=f\"{prefix}/{base_job_prefix}/sklearn-abalone-preprocess\",\n", + " sagemaker_session=sagemaker_session,\n", + " role=role,\n", + ")\n", + "\n", + "xgb_train.set_hyperparameters(\n", + " objective=\"reg:linear\",\n", + " num_round=50,\n", + " max_depth=5,\n", + " eta=0.2,\n", + " gamma=4,\n", + " min_child_weight=6,\n", + " subsample=0.7,\n", + " silent=0,\n", + ")\n", + "\n", + "step_train = TrainingStep(\n", + " name=\"TrainAbaloneModel\",\n", + " estimator=xgb_train,\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"validation\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " },\n", + " cache_config=cache_config,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Evaluate the model\n", + "\n", + "Use a processing job to evaluate the model from the TrainingStep. If the output of the evaluation is True, a model will be created and a Lambda will be invoked to deploy the model to a SageMaker Endpoint. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile evaluate.py\n", + "\n", + "\"\"\"Evaluation script for measuring mean squared error.\"\"\"\n", + "import json\n", + "import logging\n", + "import pathlib\n", + "import pickle\n", + "import tarfile\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost\n", + "\n", + "from sklearn.metrics import mean_squared_error\n", + "\n", + "logger = logging.getLogger()\n", + "logger.setLevel(logging.INFO)\n", + "logger.addHandler(logging.StreamHandler())\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " logger.debug(\"Starting evaluation.\")\n", + " model_path = \"/opt/ml/processing/model/model.tar.gz\"\n", + " with tarfile.open(model_path) as tar:\n", + " tar.extractall(path=\".\")\n", + "\n", + " logger.debug(\"Loading xgboost model.\")\n", + " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", + "\n", + " logger.debug(\"Reading test data.\")\n", + " test_path = \"/opt/ml/processing/test/test.csv\"\n", + " df = pd.read_csv(test_path, header=None)\n", + "\n", + " logger.debug(\"Reading test data.\")\n", + " y_test = df.iloc[:, 0].to_numpy()\n", + " df.drop(df.columns[0], axis=1, inplace=True)\n", + " X_test = xgboost.DMatrix(df.values)\n", + "\n", + " logger.info(\"Performing predictions against test data.\")\n", + " predictions = model.predict(X_test)\n", + "\n", + " logger.debug(\"Calculating mean squared error.\")\n", + " mse = mean_squared_error(y_test, predictions)\n", + " std = np.std(y_test - predictions)\n", + " report_dict = {\n", + " \"regression_metrics\": {\n", + " \"mse\": {\"value\": mse, \"standard_deviation\": std},\n", + " },\n", + " }\n", + "\n", + " output_dir = \"/opt/ml/processing/evaluation\"\n", + " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", + "\n", + " logger.info(\"Writing out evaluation report with mse: %f\", mse)\n", + " evaluation_path = f\"{output_dir}/evaluation.json\"\n", + " with open(evaluation_path, \"w\") as f:\n", + " f.write(json.dumps(report_dict))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# A ProcessingStep is used to evaluate the performance of the trained model. Based on the results of the evaluation, the model is created and deployed.\n", + "\n", + "script_eval = ScriptProcessor(\n", + " image_uri=image_uri,\n", + " command=[\"python3\"],\n", + " instance_type=processing_instance_type,\n", + " instance_count=1,\n", + " base_job_name=f\"{prefix}/{base_job_prefix}/sklearn-abalone-preprocess\",\n", + " sagemaker_session=sagemaker_session,\n", + " role=role,\n", + ")\n", + "\n", + "evaluation_report = PropertyFile(\n", + " name=\"AbaloneEvaluationReport\",\n", + " output_name=\"evaluation\",\n", + " path=\"evaluation.json\",\n", + ")\n", + "\n", + "step_eval = ProcessingStep(\n", + " name=\"EvaluateAbaloneModel\",\n", + " processor=script_eval,\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " destination=\"/opt/ml/processing/model\",\n", + " ),\n", + " ProcessingInput(\n", + " source=step_process.properties.ProcessingOutputConfig.Outputs[\"test\"].S3Output.S3Uri,\n", + " destination=\"/opt/ml/processing/test\",\n", + " ),\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(\n", + " output_name=\"evaluation\",\n", + " source=\"/opt/ml/processing/evaluation\",\n", + " destination=f\"s3://{default_bucket}/{s3_prefix}/evaluation_report\",\n", + " ),\n", + " ],\n", + " code=\"evaluate.py\",\n", + " property_files=[evaluation_report],\n", + " cache_config=cache_config,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Create the model\n", + "\n", + "The model is created and the name of the model is provided to the Lambda function for deployment. The `CreateModelStep` dynamically assigns a name to the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create Model\n", + "model = Model(\n", + " image_uri=image_uri,\n", + " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " sagemaker_session=sagemaker_session,\n", + " role=role,\n", + " predictor_cls=XGBoostPredictor,\n", + ")\n", + "\n", + "step_create_model = CreateModelStep(\n", + " name=\"CreateModel\",\n", + " model=model,\n", + " inputs=sagemaker.inputs.CreateModelInput(instance_type=\"ml.m4.large\"),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Create the Lambda Step\n", + "\n", + "When defining the LambdaStep, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the `lambda_func` argument to provide the function ARN to an already deployed Lambda function OR use the `Lambda` class to create a Lambda function by providing a script, function name and role for the Lambda function. \n", + "\n", + "When passing inputs to the Lambda, the `inputs` argument can be used and within the Lambda function's handler, the `event` argument can be used to retrieve the inputs.\n", + "\n", + "The dictionary response from the Lambda function is parsed through the `LambdaOutput` objects provided to the `outputs` argument. The `output_name` in `LambdaOutput` corresponds to the dictionary key in the Lambda's return dictionary. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define the Lambda function\n", + "\n", + "Users can choose the leverage the Lambda helper class to create a Lambda function and provide that function object to the LambdaStep. Alternatively, users can use a pre-deployed Lambda function and provide the function ARN to the `Lambda` helper class in the lambda step. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile lambda_helper.py\n", + "\n", + "\"\"\"\n", + "This Lambda function creates an Endpoint Configuration and deploys a model to an Endpoint. \n", + "The name of the model to deploy is provided via the `event` argument\n", + "\"\"\"\n", + "\n", + "import json\n", + "import boto3\n", + "\n", + "\n", + "def lambda_handler(event, context):\n", + " \"\"\" \"\"\"\n", + " sm_client = boto3.client(\"sagemaker\")\n", + "\n", + " # The name of the model created in the Pipeline CreateModelStep\n", + " model_name = event[\"model_name\"]\n", + "\n", + " endpoint_config_name = event[\"endpoint_config_name\"]\n", + " endpoint_name = event[\"endpoint_name\"]\n", + "\n", + " create_endpoint_config_response = sm_client.create_endpoint_config(\n", + " EndpointConfigName=endpoint_config_name,\n", + " ProductionVariants=[\n", + " {\n", + " \"InstanceType\": \"ml.m4.xlarge\",\n", + " \"InitialVariantWeight\": 1,\n", + " \"InitialInstanceCount\": 1,\n", + " \"ModelName\": model_name,\n", + " \"VariantName\": \"AllTraffic\",\n", + " }\n", + " ],\n", + " )\n", + "\n", + " create_endpoint_response = sm_client.create_endpoint(\n", + " EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name\n", + " )\n", + "\n", + " return {\n", + " \"statusCode\": 200,\n", + " \"body\": json.dumps(\"Created Endpoint!\"),\n", + " \"other_key\": \"example_value\",\n", + " }" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### IAM Role\n", + "\n", + "The Lambda function needs an IAM role that will allow it to deploy a SageMaker Endpoint. The role ARN must be provided in the LambdaStep. \n", + "\n", + "The Lambda role should at minimum have policies to allow `sagemaker:CreateModel`, `sagemaker:CreateEndpointConfig`, `sagemaker:CreateEndpoint` in addition to the based Lambda execution policies. \n", + "\n", + "A helper function in `iam_helper.py` is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - `SageMakerFullAccess`. This should be replaced with an IAM policy with least privileges as per AWS IAM best practices." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from iam_helper import create_lambda_role\n", + "\n", + "lambda_role = create_lambda_role(\"lambda-deployment-role\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Custom Lambda Step\n", + "\n", + "current_time = time.strftime(\"%m-%d-%H-%M-%S\", time.localtime())\n", + "model_name = \"demo-lambda-model\" + current_time\n", + "endpoint_config_name = \"demo-lambda-deploy-endpoint-config-\" + current_time\n", + "endpoint_name = \"demo-lambda-deploy-endpoint-\" + current_time\n", + "\n", + "function_name = \"sagemaker-lambda-step-endpoint-deploy-\" + current_time\n", + "\n", + "# Lambda helper class can be used to create the Lambda function\n", + "func = Lambda(\n", + " function_name=function_name,\n", + " execution_role_arn=lambda_role,\n", + " script=\"lambda_helper.py\",\n", + " handler=\"lambda_helper.lambda_handler\",\n", + ")\n", + "\n", + "output_param_1 = LambdaOutput(output_name=\"statusCode\", output_type=LambdaOutputTypeEnum.String)\n", + "output_param_2 = LambdaOutput(output_name=\"body\", output_type=LambdaOutputTypeEnum.String)\n", + "output_param_3 = LambdaOutput(output_name=\"other_key\", output_type=LambdaOutputTypeEnum.String)\n", + "\n", + "step_deploy_lambda = LambdaStep(\n", + " name=\"LambdaStep\",\n", + " lambda_func=func,\n", + " inputs={\n", + " \"model_name\": step_create_model.properties.ModelName,\n", + " \"endpoint_config_name\": endpoint_config_name,\n", + " \"endpoint_name\": endpoint_name,\n", + " },\n", + " outputs=[output_param_1, output_param_2, output_param_3],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# condition step for evaluating model quality and branching execution.\n", + "# The `json_path` value is based on the `report_dict` variable in `evaluate.py`\n", + "\n", + "cond_lte = ConditionLessThanOrEqualTo(\n", + " left=JsonGet(\n", + " step=step_eval,\n", + " property_file=evaluation_report,\n", + " json_path=\"regression_metrics.mse.value\",\n", + " ),\n", + " right=6.0,\n", + ")\n", + "\n", + "step_cond = ConditionStep(\n", + " name=\"CheckMSEAbaloneEvaluation\",\n", + " conditions=[cond_lte],\n", + " if_steps=[step_create_model, step_deploy_lambda],\n", + " else_steps=[],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Use the same pipeline name across execution for cache usage.\n", + "\n", + "pipeline_name = \"lambda-step-pipeline\" + current_time\n", + "\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " processing_instance_type,\n", + " processing_instance_count,\n", + " training_instance_type,\n", + " input_data,\n", + " model_approval_status,\n", + " ],\n", + " steps=[step_process, step_train, step_eval, step_cond],\n", + " sagemaker_session=sagemaker_session,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Execute the Pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "definition = json.loads(pipeline.definition())\n", + "definition" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline.upsert(role_arn=role)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution = pipeline.start()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.wait()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Cleaning up resources\n", + "\n", + "Running the following cell will delete the following resources created in this notebook -\n", + "* SageMaker Model\n", + "* SageMaker Endpoint Configuration\n", + "* SageMaker Endpoint\n", + "* SageMaker Pipeline\n", + "* Lambda Function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a SageMaker client\n", + "sm_client = boto3.client(\"sagemaker\")\n", + "\n", + "# Get the model name from the EndpointCofig. The CreateModelStep properties are not available outside the Pipeline execution context\n", + "# so `step_create_model.properties.ModelName` can not be used while deleting the model.\n", + "model_name = sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)[\n", + " \"ProductionVariants\"\n", + "][0][\"ModelName\"]\n", + "\n", + "# Delete the Model\n", + "sm_client.delete_model(ModelName=model_name)\n", + "\n", + "# Delete the EndpointConfig\n", + "sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)\n", + "\n", + "# Delete the endpoint\n", + "sm_client.delete_endpoint(EndpointName=endpoint_name)\n", + "\n", + "# Delete the Lambda function\n", + "func.delete()\n", + "\n", + "# Delete the Pipeline\n", + "sm_client.delete_pipeline(PipelineName=pipeline_name)" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + }, + "metadata": { + "interpreter": { + "hash": "ac2eaa0ea0ebeafcc7822e65e46aa9d4f966f30b695406963e145ea4a91cd4fc" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}