diff --git a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_cifar10_tfrecords.py b/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_cifar10_tfrecords.py deleted file mode 100644 index b481b2e37b..0000000000 --- a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_cifar10_tfrecords.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# https://aws.amazon.com/apache-2-0/ -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. -from __future__ import absolute_import - -import argparse -import os -import shutil -import sys -import tarfile - -import tensorflow as tf -from six.moves import cPickle as pickle -from six.moves import xrange - -CIFAR_FILENAME = "cifar-10-python.tar.gz" -CIFAR_DOWNLOAD_URL = "https://www.cs.toronto.edu/~kriz/" + CIFAR_FILENAME -CIFAR_LOCAL_FOLDER = "cifar-10-batches-py" - - -def download_and_extract(data_dir): - # download CIFAR-10 if not already downloaded. - tf.contrib.learn.datasets.base.maybe_download(CIFAR_FILENAME, data_dir, CIFAR_DOWNLOAD_URL) - tarfile.open(os.path.join(data_dir, CIFAR_FILENAME), "r:gz").extractall(data_dir) - - -def _int64_feature(value): - return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) - - -def _bytes_feature(value): - return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) - - -def _get_file_names(): - """Returns the file names expected to exist in the input_dir.""" - return { - "train": ["data_batch_%d" % i for i in xrange(1, 5)], - "validation": ["data_batch_5"], - "eval": ["test_batch"], - } - - -def read_pickle_from_file(filename): - with tf.io.gfile.GFile(filename, "rb") as f: - if sys.version_info.major >= 3: - return pickle.load(f, encoding="bytes") - else: - return pickle.load(f) - - -def convert_to_tfrecord(input_files, output_file): - """Converts a file to TFRecords.""" - print("Generating %s" % output_file) - with tf.io.TFRecordWriter(output_file) as record_writer: - for input_file in input_files: - data_dict = read_pickle_from_file(input_file) - data = data_dict[b"data"] - labels = data_dict[b"labels"] - - num_entries_in_batch = len(labels) - for i in range(num_entries_in_batch): - example = tf.train.Example( - features=tf.train.Features( - feature={ - "image": _bytes_feature(data[i].tobytes()), - "label": _int64_feature(labels[i]), - } - ) - ) - record_writer.write(example.SerializeToString()) - - -def main(data_dir): - print("Download from {} and extract.".format(CIFAR_DOWNLOAD_URL)) - download_and_extract(data_dir) - - file_names = _get_file_names() - input_dir = os.path.join(data_dir, CIFAR_LOCAL_FOLDER) - for mode, files in file_names.items(): - input_files = [os.path.join(input_dir, f) for f in files] - - mode_dir = os.path.join(data_dir, mode) - output_file = os.path.join(mode_dir, mode + ".tfrecords") - if not os.path.exists(mode_dir): - os.makedirs(mode_dir) - try: - os.remove(output_file) - except OSError: - pass - - # Convert to tf.train.Example and write the to TFRecords. - convert_to_tfrecord(input_files, output_file) - - print("Done!") - shutil.rmtree(os.path.join(data_dir, "cifar-10-batches-py")) - os.remove(os.path.join(data_dir, "cifar-10-python.tar.gz")) # Remove the original .tzr.gz files - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--data-dir", type=str, default="", help="Directory to download and extract CIFAR-10 to." - ) - - args = parser.parse_args() - main(args.data_dir) diff --git a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_tensorboard_command.py b/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_tensorboard_command.py deleted file mode 100644 index d1c1074db4..0000000000 --- a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/generate_tensorboard_command.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# https://aws.amazon.com/apache-2-0/ -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. -import re -from datetime import datetime, timedelta - -import boto3 - -client = boto3.client("sagemaker") -running_jobs = client.list_training_jobs(CreationTimeAfter=datetime.utcnow() - timedelta(hours=1)) - -logdir = None -for job in running_jobs["TrainingJobSummaries"]: - tensorboard_job = False - name = None - tags = client.list_tags(ResourceArn=job["TrainingJobArn"]) - for tag in tags["Tags"]: - if tag["Key"] == "TensorBoard": - name = tag["Value"] - if tag["Key"] == "Project" and tag["Value"] == "cifar10": - desc = client.describe_training_job(TrainingJobName=job["TrainingJobName"]) - job_name = desc["HyperParameters"]["sagemaker_job_name"].replace('"', "") - tensorboard_dir = re.sub( - "source/sourcedir.tar.gz", - "model", - desc["HyperParameters"]["sagemaker_submit_directory"], - ) - tensorboard_job = True - - if tensorboard_job: - if name is None: - name = job["TrainingJobName"] - - if logdir is None: - logdir = "{}:{}".format(name, tensorboard_dir) - else: - logdir = "{},{}:{}".format(logdir, name, tensorboard_dir) - -if logdir: - print( - "AWS_REGION={} tensorboard --logdir {}".format(boto3.session.Session().region_name, logdir) - ) -else: - print("No jobs are in progress") diff --git a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/keras_pipe_mode_horovod_cifar10.ipynb b/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/keras_pipe_mode_horovod_cifar10.ipynb deleted file mode 100644 index 3c88178848..0000000000 --- a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/keras_pipe_mode_horovod_cifar10.ipynb +++ /dev/null @@ -1,613 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Train and Host a Keras Model with Pipe Mode and Horovod on Amazon SageMaker\n", - "\n", - "Amazon SageMaker is a fully-managed service that provides developers and data scientists with the ability to build, train, and deploy machine learning (ML) models quickly. Amazon SageMaker removes the heavy lifting from each step of the machine learning process to make it easier to develop high-quality models. The SageMaker Python SDK makes it easy to train and deploy models in Amazon SageMaker with several different machine learning and deep learning frameworks, including TensorFlow and Keras.\n", - "\n", - "In this notebook, we train and host a [Keras Sequential model](https://keras.io/getting-started/sequential-model-guide) on SageMaker. The model used for this notebook is a simple deep convolutional neural network (CNN) that was extracted from [the Keras examples](https://github.com/keras-team/keras/blob/master/examples/cifar10_cnn.py).\n", - "\n", - "For training our model, we also demonstrate distributed training with [Horovod](https://horovod.readthedocs.io) and Pipe Mode. Amazon SageMaker's Pipe Mode streams your dataset directly to your training instances instead of being downloaded first, which translates to training jobs that start sooner, finish quicker, and need less disk space. \n", - "
\n", - "
\n", - " Instance Type and Pricing: \n", - "\n", - "This notebook was trained using the Python 3 (TensorFlow CPU Optimized) kernel using the ml.p3.2xlarge compute instance type in the us-west-2 region. Training time is approximately 70 minutes with the aforementioned hardware specifications.\n", - "\n", - "Price per hour depends on your region and instance type. You can reference prices on the [SageMaker pricing page](https://aws.amazon.com/sagemaker/pricing/). \n", - "\n", - "---\n", - "---" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Setup\n", - "First, we define a few variables that are be needed later in the example." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sagemaker\n", - "from sagemaker import get_execution_role\n", - "\n", - "sagemaker_session = sagemaker.Session()\n", - "\n", - "role = get_execution_role()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We also install a couple libraries we need for visualizing our model's prediction results." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip install matplotlib seaborn" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## The CIFAR-10 dataset\n", - "\n", - "The [CIFAR-10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html) is one of the most popular machine learning datasets. It consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). Here are the classes in the dataset, as well as 10 random images from each:\n", - "\n", - "![cifar10](https://maet3608.github.io/nuts-ml/_images/cifar10.png)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Prepare the dataset for training\n", - "\n", - "To use the CIFAR-10 dataset, we first download it and convert it to TFRecords. This step takes around 5 minutes." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!python generate_cifar10_tfrecords.py --data-dir ./data" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, we upload the data to Amazon S3:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from sagemaker.s3 import S3Uploader\n", - "\n", - "bucket = sagemaker_session.default_bucket()\n", - "dataset_uri = S3Uploader.upload(\"data\", \"s3://{}/tf-cifar10-example/data\".format(bucket))\n", - "\n", - "display(dataset_uri)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Train the model\n", - "\n", - "In this tutorial, we train a deep CNN to learn a classification task with the CIFAR-10 dataset. We compare three different training jobs: a baseline training job, training with Pipe Mode, and distributed training with Horovod." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Configure metrics\n", - "\n", - "In addition to running the training job, Amazon SageMaker can retrieve training metrics directly from the logs and send them to CloudWatch metrics. Here, we define metrics we would like to observe:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "metric_definitions = [\n", - " {\"Name\": \"train:loss\", \"Regex\": \".*loss: ([0-9\\\\.]+) - accuracy: [0-9\\\\.]+.*\"},\n", - " {\"Name\": \"train:accuracy\", \"Regex\": \".*loss: [0-9\\\\.]+ - accuracy: ([0-9\\\\.]+).*\"},\n", - " {\n", - " \"Name\": \"validation:accuracy\",\n", - " \"Regex\": \".*step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: [0-9\\\\.]+ - val_accuracy: ([0-9\\\\.]+).*\",\n", - " },\n", - " {\n", - " \"Name\": \"validation:loss\",\n", - " \"Regex\": \".*step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: ([0-9\\\\.]+) - val_accuracy: [0-9\\\\.]+.*\",\n", - " },\n", - " {\n", - " \"Name\": \"sec/steps\",\n", - " \"Regex\": \".* - \\d+s (\\d+)[mu]s/step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: [0-9\\\\.]+ - val_accuracy: [0-9\\\\.]+\",\n", - " },\n", - "]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Run a baseline training job on SageMaker\n", - "\n", - "The SageMaker Python SDK's `sagemaker.tensorflow.TensorFlow` estimator class makes it easy for us to interact with SageMaker. Here, we create one to configure a training job. Some parameters worth noting:\n", - "\n", - "* `entry_point`: our training script (adapted from [this Keras example](https://github.com/keras-team/keras/blob/master/examples/cifar10_cnn.py)).\n", - "* `metric_definitions`: the metrics (defined above) that we want sent to CloudWatch.\n", - "* `train_instance_count`: the number of training instances. Here, we set it to 1 for our baseline training job.\n", - "\n", - "For more details about the TensorFlow estimator class, see the [API documentation](https://sagemaker.readthedocs.io/en/stable/sagemaker.tensorflow.html)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from sagemaker.tensorflow import TensorFlow\n", - "\n", - "hyperparameters = {\"epochs\": 10, \"batch-size\": 256}\n", - "tags = [{\"Key\": \"Project\", \"Value\": \"cifar10\"}]\n", - "\n", - "estimator = TensorFlow(\n", - " entry_point=\"keras_cifar10.py\",\n", - " source_dir=\"source\",\n", - " metric_definitions=metric_definitions,\n", - " hyperparameters=hyperparameters,\n", - " role=role,\n", - " framework_version=\"1.15.2\",\n", - " py_version=\"py3\",\n", - " instance_count=1,\n", - " instance_type=\"ml.p3.2xlarge\",\n", - " base_job_name=\"cifar10-tf\",\n", - " tags=tags,\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Once we have our estimator, we call `fit()` to start the SageMaker training job and pass the inputs that we uploaded to Amazon S3 earlier. We pass the inputs as a dictionary to define different data channels for training." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "inputs = {\n", - " \"train\": \"{}/train\".format(dataset_uri),\n", - " \"validation\": \"{}/validation\".format(dataset_uri),\n", - " \"eval\": \"{}/eval\".format(dataset_uri),\n", - "}\n", - "\n", - "estimator.fit(inputs)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### View the job training metrics\n", - "\n", - "We can now view the metrics from the training job directly in the SageMaker console. \n", - "\n", - "Log into the [SageMaker console](https://console.aws.amazon.com/sagemaker/home), choose the latest training job, and scroll down to the monitor section. Alternatively, the code below uses the region and training job name to generate a URL to CloudWatch metrics.\n", - "\n", - "Using CloudWatch metrics, you can change the period and configure the statistics." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from urllib import parse\n", - "\n", - "from IPython.core.display import Markdown\n", - "\n", - "region = sagemaker_session.boto_region_name\n", - "cw_url = parse.urlunparse(\n", - " (\n", - " \"https\",\n", - " \"{}.console.aws.amazon.com\".format(region),\n", - " \"/cloudwatch/home\",\n", - " \"\",\n", - " \"region={}\".format(region),\n", - " \"metricsV2:namespace=/aws/sagemaker/TrainingJobs;dimensions=TrainingJobName;search={}\".format(\n", - " estimator.latest_training_job.name\n", - " ),\n", - " )\n", - ")\n", - "\n", - "display(\n", - " Markdown(\n", - " \"CloudWatch metrics: [link]({}). After you choose a metric, \"\n", - " \"change the period to 1 Minute (Graphed Metrics -> Period).\".format(cw_url)\n", - " )\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Train on SageMaker with Pipe Mode\n", - "\n", - "Here we train our model using Pipe Mode. With Pipe Mode, SageMaker uses [Linux named pipes](https://www.linuxjournal.com/article/2156) to stream the training data directly from S3 instead of downloading the data first.\n", - "\n", - "In our script, we enable Pipe Mode using the following code:\n", - "\n", - "```python\n", - "from sagemaker_tensorflow import PipeModeDataset\n", - "\n", - "dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "When we create our estimator, the only difference from before is that we also specify `input_mode='Pipe'`:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipe_mode_estimator = TensorFlow(\n", - " entry_point=\"keras_cifar10.py\",\n", - " source_dir=\"source\",\n", - " metric_definitions=metric_definitions,\n", - " hyperparameters=hyperparameters,\n", - " role=role,\n", - " framework_version=\"1.15.2\",\n", - " py_version=\"py3\",\n", - " instance_count=1,\n", - " instance_type=\"ml.p3.2xlarge\",\n", - " input_mode=\"Pipe\",\n", - " base_job_name=\"cifar10-tf-pipe\",\n", - " tags=tags,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipe_mode_estimator.fit(inputs)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Distributed training with Horovod\n", - "\n", - "[Horovod](https://horovod.readthedocs.io) is a distributed training framework based on MPI. To use Horovod, we make the following changes to our training script:\n", - "\n", - "1. Enable Horovod:\n", - "\n", - "```python\n", - "import horovod.keras as hvd\n", - "\n", - "hvd.init()\n", - "config = tf.ConfigProto()\n", - "config.gpu_options.allow_growth = True\n", - "config.gpu_options.visible_device_list = str(hvd.local_rank())\n", - "K.set_session(tf.Session(config=config))\n", - "```\n", - "\n", - "2. Add these callbacks:\n", - "\n", - "```python\n", - "hvd.callbacks.BroadcastGlobalVariablesCallback(0)\n", - "hvd.callbacks.MetricAverageCallback()\n", - "```\n", - "\n", - "3. Configure the optimizer:\n", - "\n", - "```python\n", - "opt = Adam(lr=learning_rate * size, decay=weight_decay)\n", - "opt = hvd.DistributedOptimizer(opt)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To configure the training job, we specify the following for the distribution:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "distribution = {\n", - " \"mpi\": {\n", - " \"enabled\": True,\n", - " \"processes_per_host\": 1, # Number of Horovod processes per host\n", - " }\n", - "}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This is then passed to our estimator:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "dist_estimator = TensorFlow(\n", - " entry_point=\"keras_cifar10.py\",\n", - " source_dir=\"source\",\n", - " metric_definitions=metric_definitions,\n", - " hyperparameters=hyperparameters,\n", - " distribution=distribution,\n", - " role=role,\n", - " framework_version=\"1.15.2\",\n", - " py_version=\"py3\",\n", - " instance_count=2,\n", - " instance_type=\"ml.p3.2xlarge\",\n", - " base_job_name=\"cifar10-tf-dist\",\n", - " tags=tags,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "dist_estimator.fit(inputs)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Deploy the trained model\n", - "\n", - "After we train our model, we can deploy it to a SageMaker Endpoint, which serves prediction requests in real-time. To do so, we simply call `deploy()` on our estimator, passing in the desired number of instances and instance type for the endpoint.\n", - "\n", - "Because we're using TensorFlow Serving for deployment, our training script saves the model in TensorFlow's SavedModel format. \n", - "\n", - "We don't need accelerated computing power for inference, so let's switch over to a ml.m4.xlarge instance type. \n", - "\n", - "For more information about deploying Keras and TensorFlow models in SageMaker, see [this blog post](https://aws.amazon.com/blogs/machine-learning/deploy-trained-keras-or-tensorflow-models-using-amazon-sagemaker)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "predictor = estimator.deploy(initial_instance_count=1, instance_type=\"ml.m4.xlarge\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Invoke the endpoint\n", - "\n", - "To verify the that the endpoint is in service, we generate some random data in the correct shape and get a prediction." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "\n", - "data = np.random.randn(1, 32, 32, 3)\n", - "print(\"Predicted class: {}\".format(np.argmax(predictor.predict(data)[\"predictions\"])))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now let's use the test dataset for predictions." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from keras.datasets import cifar10\n", - "\n", - "(x_train, y_train), (x_test, y_test) = cifar10.load_data()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "With the data loaded, we can use it for predictions:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from keras.preprocessing.image import ImageDataGenerator\n", - "\n", - "\n", - "def predict(data):\n", - " predictions = predictor.predict(data)[\"predictions\"]\n", - " return predictions\n", - "\n", - "\n", - "predicted = []\n", - "actual = []\n", - "batches = 0\n", - "batch_size = 128\n", - "\n", - "datagen = ImageDataGenerator()\n", - "for data in datagen.flow(x_test, y_test, batch_size=batch_size):\n", - " for i, prediction in enumerate(predict(data[0])):\n", - " predicted.append(np.argmax(prediction))\n", - " actual.append(data[1][i][0])\n", - "\n", - " batches += 1\n", - " if batches >= len(x_test) / batch_size:\n", - " break" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "With the predictions, we calculate our model accuracy and create a confusion matrix." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from sklearn.metrics import accuracy_score\n", - "\n", - "accuracy = accuracy_score(y_pred=predicted, y_true=actual)\n", - "display(\"Average accuracy: {}%\".format(round(accuracy * 100, 2)))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%matplotlib inline\n", - "import matplotlib.pyplot as plt\n", - "import pandas as pd\n", - "import seaborn as sn\n", - "from sklearn.metrics import confusion_matrix\n", - "\n", - "cm = confusion_matrix(y_pred=predicted, y_true=actual)\n", - "cm = cm.astype(\"float\") / cm.sum(axis=1)[:, np.newaxis]\n", - "sn.set(rc={\"figure.figsize\": (11.7, 8.27)})\n", - "sn.set(font_scale=1.4) # for label size\n", - "sn.heatmap(cm, annot=True, annot_kws={\"size\": 10}) # font size" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Aided by the colors of the heatmap, we can use this confusion matrix to understand how well the model performed for each label." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Cleanup\n", - "\n", - "To avoid incurring extra charges to your AWS account, let's delete the endpoint we created:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "predictor.delete_endpoint()" - ] - } - ], - "metadata": { - "instance_type": "ml.t3.medium", - "kernelspec": { - "display_name": "conda_tensorflow_p27", - "language": "python", - "name": "conda_tensorflow_p27" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 2 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", - "version": "2.7.16" - }, - "notice": "Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.", - "pycharm": { - "stem_cell": { - "cell_type": "raw", - "metadata": { - "collapsed": false - }, - "source": [] - } - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/source/keras_cifar10.py b/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/source/keras_cifar10.py deleted file mode 100644 index 9534c2fbc4..0000000000 --- a/aws_sagemaker_studio/frameworks/keras_pipe_mode_horovod/source/keras_cifar10.py +++ /dev/null @@ -1,361 +0,0 @@ -# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# https://aws.amazon.com/apache-2-0/ -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. -from __future__ import absolute_import, division, print_function - -import argparse -import json -import logging -import os -import re - -import keras -import tensorflow as tf -from keras import backend as K -from keras.callbacks import ModelCheckpoint, TensorBoard -from keras.layers import ( - Activation, - BatchNormalization, - Conv2D, - Dense, - Dropout, - Flatten, - MaxPooling2D, -) -from keras.models import Sequential -from keras.optimizers import SGD, Adam, RMSprop - -logging.getLogger().setLevel(logging.INFO) -tf.logging.set_verbosity(tf.logging.INFO) - -HEIGHT = 32 -WIDTH = 32 -DEPTH = 3 -NUM_CLASSES = 10 -NUM_DATA_BATCHES = 5 -NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 10000 * NUM_DATA_BATCHES -INPUT_TENSOR_NAME = "inputs_input" # needs to match the name of the first layer + "_input" - - -def keras_model_fn(learning_rate, weight_decay, optimizer, momentum, mpi=False, hvd=False): - """keras_model_fn receives hyperparameters from the training job and returns a compiled keras model. - The model is transformed into a TensorFlow Estimator before training and saved in a - TensorFlow Serving SavedModel at the end of training. - """ - model = Sequential() - model.add(Conv2D(32, (3, 3), padding="same", name="inputs", input_shape=(HEIGHT, WIDTH, DEPTH))) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(Conv2D(32, (3, 3))) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Dropout(0.2)) - - model.add(Conv2D(64, (3, 3), padding="same")) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(Conv2D(64, (3, 3))) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Dropout(0.3)) - - model.add(Conv2D(128, (3, 3), padding="same")) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(Conv2D(128, (3, 3))) - model.add(BatchNormalization()) - model.add(Activation("relu")) - model.add(MaxPooling2D(pool_size=(2, 2))) - model.add(Dropout(0.4)) - - model.add(Flatten()) - model.add(Dense(512)) - model.add(Activation("relu")) - model.add(Dropout(0.5)) - model.add(Dense(NUM_CLASSES)) - model.add(Activation("softmax")) - - size = 1 - if mpi: - size = hvd.size() - - if optimizer.lower() == "sgd": - opt = SGD(lr=learning_rate * size, decay=weight_decay, momentum=momentum) - elif optimizer.lower() == "rmsprop": - opt = RMSprop(lr=learning_rate * size, decay=weight_decay) - else: - opt = Adam(lr=learning_rate * size, decay=weight_decay) - - if mpi: - opt = hvd.DistributedOptimizer(opt) - - model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"]) - return model - - -def train_input_fn(): - return _input(args.epochs, args.batch_size, args.train, "train") - - -def eval_input_fn(): - return _input(args.epochs, args.batch_size, args.eval, "eval") - - -def validation_input_fn(): - return _input(args.epochs, args.batch_size, args.validation, "validation") - - -def _get_filenames(channel_name, channel): - if channel_name in ["train", "validation", "eval"]: - return [os.path.join(channel, channel_name + ".tfrecords")] - else: - raise ValueError('Invalid data subset "%s"' % channel_name) - - -def _input(epochs, batch_size, channel, channel_name): - """Uses the tf.data input pipeline for CIFAR-10 dataset.""" - mode = args.data_config[channel_name]["TrainingInputMode"] - logging.info("Running {} in {} mode".format(channel_name, mode)) - - if mode == "Pipe": - from sagemaker_tensorflow import PipeModeDataset - - dataset = PipeModeDataset(channel=channel_name, record_format="TFRecord") - else: - filenames = _get_filenames(channel_name, channel) - dataset = tf.data.TFRecordDataset(filenames) - - # Repeat infinitely. - dataset = dataset.repeat() - dataset = dataset.prefetch(10) - - # Parse records. - dataset = dataset.map(_dataset_parser, num_parallel_calls=10) - - # Potentially shuffle records. - if channel_name == "train": - # Ensure that the capacity is sufficiently large to provide good random shuffling. - buffer_size = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN * 0.4) + 3 * batch_size - dataset = dataset.shuffle(buffer_size=buffer_size) - - # Batch it up. - dataset = dataset.batch(batch_size, drop_remainder=True) - iterator = tf.compat.v1.data.make_one_shot_iterator(dataset) - image_batch, label_batch = iterator.get_next() - - return {INPUT_TENSOR_NAME: image_batch}, label_batch - - -def _train_preprocess_fn(image): - """Preprocess a single training image of layout [height, width, depth].""" - # Resize the image to add four extra pixels on each side. - image = tf.image.resize_image_with_crop_or_pad(image, HEIGHT + 8, WIDTH + 8) - - # Randomly crop a [HEIGHT, WIDTH] section of the image. - image = tf.random_crop(image, [HEIGHT, WIDTH, DEPTH]) - - # Randomly flip the image horizontally. - image = tf.image.random_flip_left_right(image) - - return image - - -def _dataset_parser(value): - """Parse a CIFAR-10 record from value.""" - featdef = { - "image": tf.FixedLenFeature([], tf.string), - "label": tf.FixedLenFeature([], tf.int64), - } - - example = tf.parse_single_example(value, featdef) - image = tf.decode_raw(example["image"], tf.uint8) - image.set_shape([DEPTH * HEIGHT * WIDTH]) - - # Reshape from [depth * height * width] to [depth, height, width]. - image = tf.cast( - tf.transpose(tf.reshape(image, [DEPTH, HEIGHT, WIDTH]), [1, 2, 0]), - tf.float32, - ) - label = tf.cast(example["label"], tf.int32) - image = _train_preprocess_fn(image) - return image, tf.one_hot(label, NUM_CLASSES) - - -def save_model(model, output): - signature = tf.saved_model.signature_def_utils.predict_signature_def( - inputs={"image": model.input}, outputs={"scores": model.output} - ) - - builder = tf.saved_model.builder.SavedModelBuilder(output + "/1/") - builder.add_meta_graph_and_variables( - sess=K.get_session(), - tags=[tf.saved_model.tag_constants.SERVING], - signature_def_map={ - tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature - }, - ) - - builder.save() - logging.info("Model successfully saved at: {}".format(output)) - - -def main(args): - if "sourcedir.tar.gz" in args.tensorboard_dir: - tensorboard_dir = re.sub("source/sourcedir.tar.gz", "model", args.tensorboard_dir) - else: - tensorboard_dir = args.tensorboard_dir - logging.info("Writing TensorBoard logs to {}".format(tensorboard_dir)) - - mpi = False - if "sagemaker_mpi_enabled" in args.fw_params: - if args.fw_params["sagemaker_mpi_enabled"]: - import horovod.keras as hvd - - mpi = True - # Horovod: initialize Horovod. - hvd.init() - - # Horovod: pin GPU to be used to process local rank (one GPU per process) - config = tf.ConfigProto() - config.gpu_options.allow_growth = True - config.gpu_options.visible_device_list = str(hvd.local_rank()) - K.set_session(tf.Session(config=config)) - else: - hvd = None - logging.info("Running with MPI={}".format(mpi)) - - logging.info("getting data") - train_dataset = train_input_fn() - eval_dataset = eval_input_fn() - validation_dataset = validation_input_fn() - - logging.info("configuring model") - model = keras_model_fn( - args.learning_rate, args.weight_decay, args.optimizer, args.momentum, mpi, hvd - ) - - callbacks = [] - if mpi: - callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0)) - callbacks.append(hvd.callbacks.MetricAverageCallback()) - callbacks.append(hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1)) - callbacks.append(keras.callbacks.ReduceLROnPlateau(patience=10, verbose=1)) - if hvd.rank() == 0: - callbacks.append(ModelCheckpoint(args.output_dir + "/checkpoint-{epoch}.h5")) - callbacks.append(TensorBoard(log_dir=tensorboard_dir, update_freq="epoch")) - else: - callbacks.append(keras.callbacks.ReduceLROnPlateau(patience=10, verbose=1)) - callbacks.append(ModelCheckpoint(args.output_dir + "/checkpoint-{epoch}.h5")) - callbacks.append(TensorBoard(log_dir=tensorboard_dir, update_freq="epoch")) - - logging.info("Starting training") - size = 1 - if mpi: - size = hvd.size() - - model.fit( - x=train_dataset[0], - y=train_dataset[1], - steps_per_epoch=(num_examples_per_epoch("train") // args.batch_size) // size, - epochs=args.epochs, - validation_data=validation_dataset, - validation_steps=(num_examples_per_epoch("validation") // args.batch_size) // size, - callbacks=callbacks, - ) - - score = model.evaluate( - eval_dataset[0], - eval_dataset[1], - steps=num_examples_per_epoch("eval") // args.batch_size, - verbose=0, - ) - - logging.info("Test loss:{}".format(score[0])) - logging.info("Test accuracy:{}".format(score[1])) - - # Horovod: Save model only on worker 0 (i.e. master) - if mpi: - if hvd.rank() == 0: - save_model(model, args.model_output_dir) - else: - save_model(model, args.model_output_dir) - - -def num_examples_per_epoch(subset="train"): - if subset == "train": - return 40000 - elif subset == "validation": - return 10000 - elif subset == "eval": - return 10000 - else: - raise ValueError('Invalid data subset "%s"' % subset) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--train", - type=str, - required=False, - default=os.environ.get("SM_CHANNEL_TRAIN"), - help="The directory where the CIFAR-10 input data is stored.", - ) - parser.add_argument( - "--validation", - type=str, - required=False, - default=os.environ.get("SM_CHANNEL_VALIDATION"), - help="The directory where the CIFAR-10 input data is stored.", - ) - parser.add_argument( - "--eval", - type=str, - required=False, - default=os.environ.get("SM_CHANNEL_EVAL"), - help="The directory where the CIFAR-10 input data is stored.", - ) - parser.add_argument( - "--model_dir", type=str, required=True, help="The directory where the model will be stored." - ) - parser.add_argument("--model_output_dir", type=str, default=os.environ.get("SM_MODEL_DIR")) - parser.add_argument("--output-dir", type=str, default=os.environ.get("SM_OUTPUT_DIR")) - parser.add_argument("--tensorboard-dir", type=str, default=os.environ.get("SM_MODULE_DIR")) - parser.add_argument( - "--weight-decay", type=float, default=2e-4, help="Weight decay for convolutions." - ) - parser.add_argument( - "--learning-rate", - type=float, - default=0.001, - help="""\ - This is the inital learning rate value. The learning rate will decrease - during training. For more details check the model_fn implementation in - this file.\ - """, - ) - parser.add_argument( - "--epochs", type=int, default=10, help="The number of steps to use for training." - ) - parser.add_argument("--batch-size", type=int, default=128, help="Batch size for training.") - parser.add_argument( - "--data-config", type=json.loads, default=os.environ.get("SM_INPUT_DATA_CONFIG") - ) - parser.add_argument( - "--fw-params", type=json.loads, default=os.environ.get("SM_FRAMEWORK_PARAMS") - ) - parser.add_argument("--optimizer", type=str, default="adam") - parser.add_argument("--momentum", type=float, default="0.9") - args = parser.parse_args() - main(args) diff --git a/aws_sagemaker_studio/index.rst b/aws_sagemaker_studio/index.rst index 6ddcfa683a..70b48766a8 100644 --- a/aws_sagemaker_studio/index.rst +++ b/aws_sagemaker_studio/index.rst @@ -28,7 +28,6 @@ Framework examples .. toctree:: :maxdepth: 1 - frameworks/keras_pipe_mode_horovod/keras_pipe_mode_horovod_cifar10 frameworks/mxnet_gluon_sentiment/mxnet_sentiment_analysis_with_gluon frameworks/mxnet_mnist/mxnet_mnist_with_batch_transform frameworks/mxnet_onnx_ei/mxnet_onnx_ei