diff --git a/sagemaker-pipelines/tabular/local-mode/sagemaker-pipelines-local-mode.ipynb b/sagemaker-pipelines/tabular/local-mode/sagemaker-pipelines-local-mode.ipynb new file mode 100644 index 0000000000..55fa5c418f --- /dev/null +++ b/sagemaker-pipelines/tabular/local-mode/sagemaker-pipelines-local-mode.ipynb @@ -0,0 +1,1134 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Use SageMaker Pipelines to Run Your Jobs Locally\n", + "\n", + "This notebook demonstrates how to orchestrate SageMaker jobs locally using SageMaker Pipelines. \n", + "\n", + "Using a `LocalPipelineSession` object, you can now run your pipelines on your local machine before running them in the cloud. \n", + "\n", + "The `LocalPipelineSession` object is used while defining each pipeline step and when defining the complete Pipeline object. To run this pipeline in the cloud, each step along with the Pipeline object must be redefined using `PipelineSession`.\n", + "\n", + "**Note**: This notebook will not run in SageMaker Studio. You can run this on SageMaker Classic Notebook instances OR your local IDE." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## SageMaker Pipelines Local Mode\n", + "\n", + "SageMaker Pipelines Local Mode supports the following activities, which are demonstrated in this notebook:\n", + "\n", + "* ProcessingStep\n", + "* TrainingStep\n", + "* ConditionStep\n", + "* ModelStep\n", + "* TransformStep\n", + "* FailStep" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "## Dataset\n", + "\n", + "The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]. The aim for this task is to determine the age of an abalone snail from its physical measurements. At the core, this is a regression problem.\n", + "\n", + "The dataset contains several features: length (the longest shell measurement), diameter (the diameter perpendicular to length), height (the height with meat in the shell), whole_weight (the weight of whole abalone), shucked_weight (the weight of meat), viscera_weight (the gut weight after bleeding), shell_weight (the weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).\n", + "\n", + "The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.\n", + "\n", + "Before you upload the data to an S3 bucket, install the SageMaker Python SDK and gather some constants you can use later in this notebook.\n", + "\n", + "> [1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Install the latest version of the SageMaker Python SDK. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install 'sagemaker' --upgrade" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "\n", + "import boto3\n", + "import sagemaker\n", + "from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession\n", + "\n", + "# Create a `LocalPipelineSession` object so that each pipeline step will run locally\n", + "# To run this pipeline in the cloud, you must change `LocalPipelineSession()` to `PipelineSession()`\n", + "local_pipeline_session = LocalPipelineSession()\n", + "\n", + "region = local_pipeline_session.boto_region_name\n", + "role = sagemaker.get_execution_role()\n", + "\n", + "default_bucket = local_pipeline_session.default_bucket()\n", + "prefix = \"sagemaker-pipelines-local-mode-example\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, upload the data into the default bucket. You can select our own data set for the `input_data_uri` as is appropriate." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Pull the dataset from SageMaker's public S3 bucket and upload it to your own S3 bucket\n", + "\n", + "local_path = \"data/abalone-dataset.csv\"\n", + "\n", + "s3 = boto3.resource(\"s3\")\n", + "s3.Bucket(f\"sagemaker-sample-files\").download_file(\n", + " \"datasets/tabular/uci_abalone/abalone.csv\", local_path\n", + ")\n", + "\n", + "base_uri = f\"s3://{default_bucket}/{prefix}/abalone-data-set\"\n", + "input_data_uri = sagemaker.s3.S3Uploader.upload(\n", + " local_path=local_path,\n", + " desired_s3_uri=base_uri,\n", + ")\n", + "print(input_data_uri)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.parameters import ParameterString, ParameterFloat\n", + "\n", + "processing_instance_count = 1\n", + "training_instance_count = 1\n", + "transform_instance_count = 1\n", + "instance_type = \"ml.m5.xlarge\"\n", + "\n", + "input_data = ParameterString(\n", + " name=\"InputData\",\n", + " default_value=input_data_uri,\n", + ")\n", + "\n", + "mse_threshold = ParameterFloat(name=\"MseThreshold\", default_value=7.0)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Processing Step for Feature Engineering\n", + "\n", + "First, develop a preprocessing script that is specified in the Processing step.\n", + "\n", + "This notebook cell writes a file `preprocessing_abalone.py`, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. The preprocessing script uses `scikit-learn` to do the following:\n", + "\n", + "* Fill in missing sex category data and encode it so that it is suitable for training.\n", + "* Scale and normalize all numerical fields, aside from sex and rings numerical data.\n", + "* Split the data into training, validation, and test datasets.\n", + "\n", + "The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p code" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile code/preprocessing.py\n", + "import argparse\n", + "import os\n", + "import requests\n", + "import tempfile\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from sklearn.compose import ColumnTransformer\n", + "from sklearn.impute import SimpleImputer\n", + "from sklearn.pipeline import Pipeline\n", + "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", + "\n", + "\n", + "# Since we get a headerless CSV file, we specify the column names here.\n", + "feature_columns_names = [\n", + " \"sex\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + "]\n", + "label_column = \"rings\"\n", + "\n", + "feature_columns_dtype = {\n", + " \"sex\": str,\n", + " \"length\": np.float64,\n", + " \"diameter\": np.float64,\n", + " \"height\": np.float64,\n", + " \"whole_weight\": np.float64,\n", + " \"shucked_weight\": np.float64,\n", + " \"viscera_weight\": np.float64,\n", + " \"shell_weight\": np.float64,\n", + "}\n", + "label_column_dtype = {\"rings\": np.float64}\n", + "\n", + "\n", + "def merge_two_dicts(x, y):\n", + " z = x.copy()\n", + " z.update(y)\n", + " return z\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " base_dir = \"/opt/ml/processing\"\n", + "\n", + " df = pd.read_csv(\n", + " f\"{base_dir}/input/abalone-dataset.csv\",\n", + " header=None,\n", + " names=feature_columns_names + [label_column],\n", + " dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),\n", + " )\n", + " numeric_features = list(feature_columns_names)\n", + " numeric_features.remove(\"sex\")\n", + " numeric_transformer = Pipeline(\n", + " steps=[(\"imputer\", SimpleImputer(strategy=\"median\")), (\"scaler\", StandardScaler())]\n", + " )\n", + "\n", + " categorical_features = [\"sex\"]\n", + " categorical_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", + " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\")),\n", + " ]\n", + " )\n", + "\n", + " preprocess = ColumnTransformer(\n", + " transformers=[\n", + " (\"num\", numeric_transformer, numeric_features),\n", + " (\"cat\", categorical_transformer, categorical_features),\n", + " ]\n", + " )\n", + "\n", + " y = df.pop(\"rings\")\n", + " X_pre = preprocess.fit_transform(df)\n", + " y_pre = y.to_numpy().reshape(len(y), 1)\n", + "\n", + " X = np.concatenate((y_pre, X_pre), axis=1)\n", + "\n", + " np.random.shuffle(X)\n", + " train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])\n", + "\n", + " pd.DataFrame(train).to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", + " pd.DataFrame(validation).to_csv(\n", + " f\"{base_dir}/validation/validation.csv\", header=False, index=False\n", + " )\n", + " pd.DataFrame(test).to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, create an instance of a `SKLearnProcessor` processor and use that in our `ProcessingStep`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "\n", + "framework_version = \"1.0-1\"\n", + "\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=framework_version,\n", + " instance_type=instance_type,\n", + " instance_count=processing_instance_count,\n", + " base_job_name=\"sklearn-abalone-process\",\n", + " role=role,\n", + " sagemaker_session=local_pipeline_session,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we take the output of the processor's `run` method and pass that as arguments to the `ProcessingStep`. By passing `local_pipeline_session` to the `sagemaker_session`, calling `.run()` does not launch the processing job, it returns the arguments needed to run the job as a step in the pipeline.\n", + "\n", + "Note the `\"train_data\"` and `\"test_data\"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker.workflow.steps import ProcessingStep\n", + "\n", + "processor_args = sklearn_processor.run(\n", + " inputs=[\n", + " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"),\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", + " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", + " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\"),\n", + " ],\n", + " code=\"code/preprocessing.py\",\n", + ")\n", + "\n", + "step_process = ProcessingStep(name=\"AbaloneProcess\", step_args=processor_args)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile code/abalone.py\n", + "\n", + "import argparse\n", + "import json\n", + "import logging\n", + "import os\n", + "import pathlib\n", + "import pickle as pkl\n", + "import tarfile\n", + "\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost as xgb\n", + "\n", + "logging.basicConfig(level=logging.INFO)\n", + "\n", + "TRAIN_VALIDATION_FRACTION = 0.2\n", + "RANDOM_STATE_SAMPLING = 200\n", + "\n", + "logging.basicConfig(level=logging.INFO)\n", + "\n", + "\n", + "def prepare_data(train_dir, validation_dir):\n", + " \"\"\"Read data from train and validation channel, and return predicting features and target variables.\n", + "\n", + " Args:\n", + " data_dir (str): directory which saves the training data.\n", + "\n", + " Returns:\n", + " Tuple of training features, training target, validation features, validation target.\n", + " \"\"\"\n", + " df_train = pd.read_csv(\n", + " os.path.join(train_dir, \"train.csv\"),\n", + " header=None,\n", + " )\n", + " df_train = df_train.iloc[np.random.permutation(len(df_train))]\n", + " df_train.columns = [\"target\"] + [f\"feature_{x}\" for x in range(df_train.shape[1] - 1)]\n", + "\n", + " try:\n", + " df_validation = pd.read_csv(\n", + " os.path.join(validation_dir, \"validation.csv\"),\n", + " header=None,\n", + " )\n", + " df_validation.columns = [\"target\"] + [\n", + " f\"feature_{x}\" for x in range(df_validation.shape[1] - 1)\n", + " ]\n", + "\n", + " except FileNotFoundError: # when validation data is not available in the directory\n", + " logging.info(\n", + " f\"Validation data is not found. {TRAIN_VALIDATION_FRACTION * 100}% of training data is \"\n", + " f\"randomly selected as validation data. The seed for random sampling is {RANDOM_STATE_SAMPLING}.\"\n", + " )\n", + " df_validation = df_train.sample(\n", + " frac=TRAIN_VALIDATION_FRACTION,\n", + " random_state=RANDOM_STATE_SAMPLING,\n", + " )\n", + " df_train.drop(df_validation.index, inplace=True)\n", + " df_validation.reset_index(drop=True, inplace=True)\n", + " df_train.reset_index(drop=True, inplace=True)\n", + "\n", + " X_train, y_train = df_train.iloc[:, 1:], df_train.iloc[:, :1]\n", + " X_val, y_val = df_validation.iloc[:, 1:], df_validation.iloc[:, :1]\n", + "\n", + " return X_train.values, y_train.values, X_val.values, y_val.values\n", + "\n", + "\n", + "def main():\n", + " \"\"\"Run training.\"\"\"\n", + " parser = argparse.ArgumentParser()\n", + "\n", + " parser.add_argument(\n", + " \"--max_depth\",\n", + " type=int,\n", + " )\n", + " parser.add_argument(\"--eta\", type=float)\n", + " parser.add_argument(\"--gamma\", type=int)\n", + " parser.add_argument(\"--min_child_weight\", type=int)\n", + " parser.add_argument(\"--subsample\", type=float)\n", + " parser.add_argument(\"--verbosity\", type=int)\n", + " parser.add_argument(\"--objective\", type=str)\n", + " parser.add_argument(\"--num_round\", type=int)\n", + " parser.add_argument(\"--tree_method\", type=str, default=\"auto\")\n", + " parser.add_argument(\"--predictor\", type=str, default=\"auto\")\n", + " parser.add_argument(\"--learning_rate\", type=str, default=\"auto\")\n", + " parser.add_argument(\"--output_data_dir\", type=str, default=os.environ.get(\"SM_OUTPUT_DATA_DIR\"))\n", + " parser.add_argument(\"--model_dir\", type=str, default=os.environ.get(\"SM_MODEL_DIR\"))\n", + " parser.add_argument(\"--train\", type=str, default=os.environ.get(\"SM_CHANNEL_TRAIN\"))\n", + " parser.add_argument(\"--validation\", type=str, default=os.environ.get(\"SM_CHANNEL_VALIDATION\"))\n", + " parser.add_argument(\"--sm_hosts\", type=str, default=os.environ.get(\"SM_HOSTS\"))\n", + " parser.add_argument(\"--sm_current_host\", type=str, default=os.environ.get(\"SM_CURRENT_HOST\"))\n", + "\n", + " args, _ = parser.parse_known_args()\n", + "\n", + " X_train, y_train, X_val, y_val = prepare_data(args.train, args.validation)\n", + "\n", + " # create dataset for lightgbm\n", + " dtrain = xgb.DMatrix(data=X_train, label=y_train)\n", + " dval = xgb.DMatrix(data=X_val, label=y_val)\n", + " watchlist = [(dtrain, \"train\"), (dval, \"validation\")]\n", + "\n", + " # specify your configurations as a dict\n", + " params = {\n", + " \"booster\": \"gbtree\",\n", + " \"objective\": args.objective,\n", + " \"learning_rate\": args.learning_rate,\n", + " \"gamma\": args.gamma,\n", + " \"min_child_weight\": args.min_child_weight,\n", + " \"max_depth\": args.max_depth,\n", + " \"subsample\": args.subsample,\n", + " \"colsample_bytree\": 1,\n", + " \"reg_lambda\": 1,\n", + " \"reg_alpha\": 0,\n", + " \"eval_metric\": \"rmse\",\n", + " }\n", + "\n", + " bst = xgb.train(\n", + " params=params,\n", + " dtrain=dtrain,\n", + " num_boost_round=args.num_round,\n", + " evals=watchlist,\n", + " xgb_model=None,\n", + " )\n", + "\n", + " model_location = args.model_dir + \"/xgboost-model\"\n", + " pkl.dump(bst, open(model_location, \"wb\"))\n", + " logging.info(\"Stored trained model at {}\".format(model_location))\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "from sagemaker.inputs import TrainingInput\n", + "\n", + "model_path = f\"s3://{default_bucket}/{prefix}/model\"\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.5-1\",\n", + " py_version=\"py3\",\n", + " instance_type=instance_type,\n", + ")\n", + "\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " entry_point=\"code/abalone.py\",\n", + " instance_type=instance_type,\n", + " instance_count=training_instance_count,\n", + " output_path=model_path,\n", + " role=role,\n", + " sagemaker_session=local_pipeline_session,\n", + ")\n", + "\n", + "xgb_train.set_hyperparameters(\n", + " objective=\"reg:squarederror\",\n", + " learning_rate=0.01,\n", + " num_round=50,\n", + " max_depth=5,\n", + " eta=0.2,\n", + " gamma=4,\n", + " min_child_weight=6,\n", + " subsample=0.7,\n", + ")\n", + "\n", + "train_args = xgb_train.fit(\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"validation\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " }\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we use the output of the estimator's `.fit()` method as arguments to the `TrainingStep`. By passing `local_pipeline_session` to the `sagemaker_session`, calling `.fit()` does not launch the training job, it returns the arguments needed to run the job as a step in the pipeline.\n", + "\n", + "Pass in the `S3Uri` of the `\"train_data\"` output channel to the `.fit()` method. Also, use the other `\"test_data\"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.inputs import TrainingInput\n", + "from sagemaker.workflow.steps import TrainingStep\n", + "\n", + "step_train = TrainingStep(\n", + " name=\"AbaloneTrain\",\n", + " step_args=train_args,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Model Evaluation Step to Evaluate the Trained Model\n", + "\n", + "First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.\n", + "\n", + "After pipeline execution, you can examine the resulting `evaluation.json` for analysis.\n", + "\n", + "The evaluation script uses `xgboost` to do the following:\n", + "\n", + "* Load the model.\n", + "* Read the test data.\n", + "* Issue predictions against the test data.\n", + "* Build a classification report, including accuracy and ROC curve.\n", + "* Save the evaluation report to the evaluation directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile code/evaluation.py\n", + "import json\n", + "import pathlib\n", + "import pickle\n", + "import tarfile\n", + "\n", + "import joblib\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost\n", + "import math\n", + "\n", + "from sklearn.metrics import mean_squared_error\n", + "\n", + "if __name__ == \"__main__\":\n", + " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", + " with tarfile.open(model_path) as tar:\n", + " tar.extractall(path=\".\")\n", + "\n", + " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", + "\n", + " test_path = \"/opt/ml/processing/test/test.csv\"\n", + " df = pd.read_csv(test_path, header=None)\n", + " df.columns = [\"target\"] + [f\"feature_{x}\" for x in range(df.shape[1] - 1)]\n", + "\n", + " y_test = df.iloc[:, 0].to_numpy()\n", + " df.drop(df.columns[0], axis=1, inplace=True)\n", + "\n", + " X_test = xgboost.DMatrix(df.values)\n", + "\n", + " predictions = model.predict(X_test)\n", + "\n", + " mse = mean_squared_error(y_test, predictions)\n", + " std = np.std(y_test - predictions)\n", + " report_dict = {\n", + " \"regression_metrics\": {\n", + " \"mse\": {\"value\": math.sqrt(mse), \"standard_deviation\": std},\n", + " },\n", + " }\n", + "\n", + " output_dir = \"/opt/ml/processing/evaluation\"\n", + " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", + "\n", + " evaluation_path = f\"{output_dir}/evaluation.json\"\n", + " with open(evaluation_path, \"w\") as f:\n", + " f.write(json.dumps(report_dict))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.processing import ScriptProcessor\n", + "\n", + "script_eval = ScriptProcessor(\n", + " image_uri=image_uri,\n", + " command=[\"python3\"],\n", + " instance_type=instance_type,\n", + " instance_count=processing_instance_count,\n", + " base_job_name=\"script-abalone-eval\",\n", + " role=role,\n", + " sagemaker_session=local_pipeline_session,\n", + ")\n", + "\n", + "eval_args = script_eval.run(\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " destination=\"/opt/ml/processing/model\",\n", + " ),\n", + " ProcessingInput(\n", + " source=step_process.properties.ProcessingOutputConfig.Outputs[\"test\"].S3Output.S3Uri,\n", + " destination=\"/opt/ml/processing/test\",\n", + " ),\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", + " ],\n", + " code=\"code/evaluation.py\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Use the processor's arguments returned by `.run()` to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. \n", + "\n", + "Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `\"test_data\"` output channel of the `step_process` `properties` are passed as inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.properties import PropertyFile\n", + "\n", + "evaluation_report = PropertyFile(\n", + " name=\"EvaluationReport\", output_name=\"evaluation\", path=\"evaluation.json\"\n", + ")\n", + "step_eval = ProcessingStep(\n", + " name=\"AbaloneEval\",\n", + " step_args=eval_args,\n", + " property_files=[evaluation_report],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Create Model Step to Create a Model\n", + "\n", + "In order to perform batch transformation using the example model, create a SageMaker model. \n", + "\n", + "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.\n", + "\n", + "We provide a custom inference script that defines the logic for the batch transform job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile code/inference.py\n", + "\n", + "import json\n", + "import os\n", + "import pickle as pkl\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import sagemaker_xgboost_container.encoder as xgb_encoders\n", + "import xgboost as xgb\n", + "import io\n", + "import logging\n", + "\n", + "logging.basicConfig(level=logging.INFO)\n", + "\n", + "\n", + "def model_fn(model_dir):\n", + " \"\"\"\n", + " Deserialize and return fitted model.\n", + " \"\"\"\n", + " model_file = \"xgboost-model\"\n", + " booster = pkl.load(open(os.path.join(model_dir, model_file), \"rb\"))\n", + " return booster\n", + "\n", + "\n", + "def transform_fn(model, request_body, request_content_type, accept):\n", + " \"\"\" \"\"\"\n", + " if request_content_type == \"text/libsvm\":\n", + " input_data = xgb_encoders.libsvm_to_dmatrix(request_body)\n", + " if request_content_type == \"text/csv\":\n", + " df = pd.read_csv(io.StringIO(request_body.strip(\"\\n\")), header=None)\n", + " df.drop(0, axis=1, inplace=True)\n", + " input_data = xgb.DMatrix(data=df)\n", + "\n", + " else:\n", + " raise ValueError(\"Content type {} is not supported.\".format(request_content_type))\n", + "\n", + " prediction = model.predict(input_data)\n", + " feature_contribs = model.predict(input_data, pred_contribs=True, validate_features=False)\n", + " output = np.hstack((prediction[:, np.newaxis], feature_contribs))\n", + "\n", + " logging.info(\"Successfully completed transform job!\")\n", + "\n", + " return \",\".join(str(x) for x in output[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.model import Model\n", + "\n", + "model = Model(\n", + " image_uri=image_uri,\n", + " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " source_dir=\"code\",\n", + " entry_point=\"inference.py\",\n", + " role=role,\n", + " sagemaker_session=local_pipeline_session,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define the `ModelStep` by providing the return values from `model.create()` as the step arguments. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.model_step import ModelStep\n", + "\n", + "step_create_model = ModelStep(\n", + " name=\"AbaloneCreateModel\", step_args=model.create(instance_type=instance_type)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Transform Step to Perform Batch Transformation\n", + "\n", + "Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.\n", + "\n", + "Specifically, pass in the `ModelName` from the `CreateModelStep`, `step_create_model` properties. The `CreateModelStep` `properties` attribute matches the object model of the [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) response object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.transformer import Transformer\n", + "\n", + "\n", + "transformer = Transformer(\n", + " model_name=step_create_model.properties.ModelName,\n", + " instance_type=instance_type,\n", + " instance_count=transform_instance_count,\n", + " output_path=f\"s3://{default_bucket}/{prefix}/transform\",\n", + " sagemaker_session=local_pipeline_session,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.inputs import TransformInput\n", + "from sagemaker.workflow.steps import TransformStep\n", + "from sagemaker.workflow.functions import Join\n", + "\n", + "transform_data = Join(\n", + " on=\"/\",\n", + " values=[\n", + " step_process.properties.ProcessingOutputConfig.Outputs[\"test\"].S3Output.S3Uri,\n", + " \"test.csv\",\n", + " ],\n", + ")\n", + "\n", + "transform_args = transformer.transform(transform_data, content_type=\"text/csv\")\n", + "\n", + "step_transform = TransformStep(name=\"AbaloneTransform\", step_args=transform_args)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.fail_step import FailStep\n", + "\n", + "step_fail = FailStep(\n", + " name=\"AbaloneMSEFail\",\n", + " error_message=Join(on=\" \", values=[\"Execution failed due to MSE >\", mse_threshold]),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation Or Terminate the Execution in Failed State\n", + "\n", + "In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. Otherwise, the pipeline execution fails and terminates. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.\n", + "\n", + "In the following section, you:\n", + "\n", + "* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.\n", + "* Use the condition in the list of conditions in a `ConditionStep`.\n", + "* Pass the `CreateModelStep` and `TransformStep` steps into the `if_steps` of the `ConditionStep`, which are only executed if the condition evaluates to `True`.\n", + "* Pass the `FailStep` step into the `else_steps`of the `ConditionStep`, which is only executed if the condition evaluates to `False`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", + "from sagemaker.workflow.condition_step import ConditionStep\n", + "from sagemaker.workflow.functions import JsonGet\n", + "\n", + "cond_lte = ConditionLessThanOrEqualTo(\n", + " left=JsonGet(\n", + " step_name=step_eval.name,\n", + " property_file=evaluation_report,\n", + " json_path=\"regression_metrics.mse.value\",\n", + " ),\n", + " right=mse_threshold,\n", + ")\n", + "\n", + "step_cond = ConditionStep(\n", + " name=\"AbaloneMSECond\",\n", + " conditions=[cond_lte],\n", + " if_steps=[step_create_model, step_transform],\n", + " else_steps=[step_fail],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a Pipeline using `LocalPipelineSession`\n", + "\n", + "In this section, combine the steps into a Pipeline so it can be executed. We provide a `LocalPipelineSession` object to the `Pipeline` so that when executed, all the steps in the pipeline will run locally on the machine." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.pipeline import Pipeline\n", + "\n", + "pipeline_name = f\"LocalModelPipeline\"\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " input_data,\n", + " mse_threshold,\n", + " ],\n", + " steps=[step_process, step_train, step_eval, step_cond],\n", + " sagemaker_session=local_pipeline_session,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### (Optional) Examining the pipeline definition\n", + "\n", + "The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "definition = json.loads(pipeline.definition())\n", + "definition" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit the pipeline to SageMaker and start execution\n", + "\n", + "Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline.upsert(role_arn=role)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the pipeline and accept all the default parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution = pipeline.start()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps = execution.list_steps()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get the step outputs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get output files from processing job\n", + "\n", + "processing_job_name = steps[\"PipelineExecutionSteps\"][0][\"Metadata\"][\"ProcessingJob\"][\"Arn\"]\n", + "outputs = local_pipeline_session.sagemaker_client.describe_processing_job(\n", + " ProcessingJobName=processing_job_name\n", + ")[\"ProcessingOutputConfig\"][\"Outputs\"]\n", + "for key in outputs:\n", + " print(outputs[key][\"S3Output\"][\"S3Uri\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get output from training job\n", + "\n", + "training_job_name = steps[\"PipelineExecutionSteps\"][1][\"Metadata\"][\"TrainingJob\"][\"Arn\"]\n", + "outputs = local_pipeline_session.sagemaker_client.describe_training_job(\n", + " TrainingJobName=training_job_name\n", + ")\n", + "print(\"Model location : \", outputs[\"ModelArtifacts\"][\"S3ModelArtifacts\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get output from model evaluation step (processing job)\n", + "\n", + "processing_job_name = steps[\"PipelineExecutionSteps\"][2][\"Metadata\"][\"ProcessingJob\"][\"Arn\"]\n", + "outputs = local_pipeline_session.sagemaker_client.describe_processing_job(\n", + " ProcessingJobName=processing_job_name\n", + ")[\"ProcessingOutputConfig\"][\"Outputs\"]\n", + "for key in outputs:\n", + " print(outputs[key][\"S3Output\"][\"S3Uri\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get output of ModelStep\n", + "import json\n", + "\n", + "model_name = steps[\"PipelineExecutionSteps\"][-1][\"Metadata\"][\"Model\"][\"Arn\"]\n", + "outputs = local_pipeline_session.sagemaker_client.describe_model(ModelName=model_name)\n", + "print(outputs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get output from the TransformStep\n", + "\n", + "transform_job_name = steps[\"PipelineExecutionSteps\"][4][\"Metadata\"][\"TransformJob\"][\"Arn\"]\n", + "outputs = local_pipeline_session.sagemaker_client.describe_transform_job(\n", + " TransformJobName=transform_job_name\n", + ")\n", + "print(outputs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Conclusion\n", + "\n", + "In this notebook we define a pipeline that will run on your local machine and tested that all the steps are returning the desired output. Once this is done, by switching the `LocalPipelineSession` to a `PipelineSession` object, you can switch the execution to run in the cloud on SageMaker instances." + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "conda_python3", + "language": "python", + "name": "conda_python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}