From 1da1bf371b45adbddedfe1d6738d45f76e6108b5 Mon Sep 17 00:00:00 2001 From: weicong wang <90941622+weicongw@users.noreply.github.com> Date: Wed, 3 Jul 2024 12:05:38 -0700 Subject: [PATCH] Add single node Neuron test to the e2e tester (#452) * Add single node Neuron test to the e2e tester * Fork the neuron DLC dockerfile --- .github/workflows/ci.yaml | 7 +- .../framework_extensions/conditions.go | 15 ++ e2e2/test/cases/neuron/main_test.go | 79 ++++++++ .../k8s-neuron-device-plugin-rbac.yml | 58 ++++++ .../manifests/k8s-neuron-device-plugin.yml | 98 ++++++++++ .../manifests/single-node-test-neuronx.yaml | 30 +++ e2e2/test/cases/neuron/neuron_test.go | 71 +++++++ e2e2/test/images/neuron/Dockerfile | 179 ++++++++++++++++++ .../images/neuron/tests/singleNodeTest.sh | 5 + .../test/images/neuron/tests/testNeuronMlp.py | 104 ++++++++++ .../neuron/tests/testNeuronParallelState.py | 106 +++++++++++ .../neuron/tests/testNeuronSingleAllReduce.py | 30 +++ 12 files changed, 781 insertions(+), 1 deletion(-) create mode 100644 e2e2/test/cases/neuron/main_test.go create mode 100644 e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin-rbac.yml create mode 100644 e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin.yml create mode 100644 e2e2/test/cases/neuron/manifests/single-node-test-neuronx.yaml create mode 100644 e2e2/test/cases/neuron/neuron_test.go create mode 100644 e2e2/test/images/neuron/Dockerfile create mode 100755 e2e2/test/images/neuron/tests/singleNodeTest.sh create mode 100644 e2e2/test/images/neuron/tests/testNeuronMlp.py create mode 100644 e2e2/test/images/neuron/tests/testNeuronParallelState.py create mode 100644 e2e2/test/images/neuron/tests/testNeuronSingleAllReduce.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d24950e73..b9b3dbc56 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,4 +20,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - run: docker build --file e2e2/test/images/Dockerfile.aws-efa-nccl-tests . \ No newline at end of file + - run: docker build --file e2e2/test/images/Dockerfile.aws-efa-nccl-tests . + build-neuronx: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: docker build --file e2e2/test/images/neuron/Dockerfile . \ No newline at end of file diff --git a/e2e2/internal/framework_extensions/conditions.go b/e2e2/internal/framework_extensions/conditions.go index c23e50534..3f7456bb4 100644 --- a/e2e2/internal/framework_extensions/conditions.go +++ b/e2e2/internal/framework_extensions/conditions.go @@ -4,6 +4,7 @@ import ( "context" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" apimachinerywait "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/e2e-framework/klient/k8s" @@ -41,3 +42,17 @@ func (c *ConditionExtension) DaemonSetReady(daemonset k8s.Object) apimachinerywa return } } + +func (c *ConditionExtension) JobSucceeded(job k8s.Object) apimachinerywait.ConditionWithContextFunc { + return func(ctx context.Context) (done bool, err error) { + if err := c.resources.Get(ctx, job.GetName(), job.GetNamespace(), job); err != nil { + return false, err + } + status := job.(*batchv1.Job).Status + spec := job.(*batchv1.Job).Spec + if status.Succeeded != *spec.Completions { + return false, nil + } + return true, nil + } +} diff --git a/e2e2/test/cases/neuron/main_test.go b/e2e2/test/cases/neuron/main_test.go new file mode 100644 index 000000000..b21449bf4 --- /dev/null +++ b/e2e2/test/cases/neuron/main_test.go @@ -0,0 +1,79 @@ +package neuron + +import ( + "context" + _ "embed" + "flag" + "log" + "os" + "slices" + "testing" + "time" + + fwext "github.com/aws/aws-k8s-tester/e2e2/internal/framework_extensions" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/pkg/env" + "sigs.k8s.io/e2e-framework/pkg/envconf" +) + +var ( + testenv env.Environment + neuronTestImage *string +) + +var ( + //go:embed manifests/k8s-neuron-device-plugin-rbac.yml + neuronDevicePlugiRbacManifest []byte + //go:embed manifests/k8s-neuron-device-plugin.yml + neuronDevicePluginManifest []byte +) + +func TestMain(m *testing.M) { + neuronTestImage = flag.String("neuronTestImage", "", "image for neuron single node test") + cfg, err := envconf.NewFromFlags() + if err != nil { + log.Fatalf("failed to initialize test environment: %v", err) + } + testenv = env.NewWithConfig(cfg) + + manifests := [][]byte{ + neuronDevicePluginManifest, + neuronDevicePlugiRbacManifest, + } + + testenv.Setup( + func(ctx context.Context, config *envconf.Config) (context.Context, error) { + err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...) + if err != nil { + return ctx, err + } + return ctx, nil + }, + func(ctx context.Context, config *envconf.Config) (context.Context, error) { + ds := appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "neuron-device-plugin-daemonset", Namespace: "kube-system"}, + } + err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds), + wait.WithTimeout(time.Minute*5)) + if err != nil { + return ctx, err + } + return ctx, nil + }, + ) + + testenv.Finish( + func(ctx context.Context, config *envconf.Config) (context.Context, error) { + slices.Reverse(manifests) + err = fwext.DeleteManifests(config.Client().RESTConfig(), manifests...) + if err != nil { + return ctx, err + } + return ctx, nil + }, + ) + + os.Exit(testenv.Run(m)) +} diff --git a/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin-rbac.yml b/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin-rbac.yml new file mode 100644 index 000000000..7f70b3090 --- /dev/null +++ b/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin-rbac.yml @@ -0,0 +1,58 @@ +# Source: https://github.com/aws-neuron/aws-neuron-sdk/blob/master/src/k8/k8s-neuron-device-plugin-rbac.yml +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: neuron-device-plugin +rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch +- apiGroups: + - "" + resources: + - pods + verbs: + - update + - patch + - get + - list + - watch +- apiGroups: + - "" + resources: + - nodes/status + verbs: + - patch + - update +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: neuron-device-plugin + namespace: kube-system +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: neuron-device-plugin + namespace: kube-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: neuron-device-plugin +subjects: +- kind: ServiceAccount + name: neuron-device-plugin + namespace: kube-system diff --git a/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin.yml b/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin.yml new file mode 100644 index 000000000..13ae420c6 --- /dev/null +++ b/e2e2/test/cases/neuron/manifests/k8s-neuron-device-plugin.yml @@ -0,0 +1,98 @@ +# Source: https://github.com/aws-neuron/aws-neuron-sdk/blob/master/src/k8/k8s-neuron-device-plugin.yml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: neuron-device-plugin-daemonset + namespace: kube-system +spec: + selector: + matchLabels: + name: neuron-device-plugin-ds + updateStrategy: + type: RollingUpdate + template: + metadata: + # Uncomment the annotation below if k8s version is 1.13 or lower + # annotations: + # scheduler.alpha.kubernetes.io/critical-pod: "" + labels: + name: neuron-device-plugin-ds + spec: + serviceAccount: neuron-device-plugin + tolerations: + - key: CriticalAddonsOnly + operator: Exists + - key: aws.amazon.com/neuron + operator: Exists + effect: NoSchedule + # Mark this pod as a critical add-on; when enabled, the critical add-on + # scheduler reserves resources for critical add-on pods so that they can + # be rescheduled after a failure. + # See https://kubernetes.io/docs/tasks/administer-cluster/guaranteed-scheduling-critical-addon-pods/ + priorityClassName: "system-node-critical" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + # Uncomment following matchExpressions if using k8s 1.16 or lower + #- matchExpressions: + # - key: "beta.kubernetes.io/instance-type" + # operator: In + # values: + # - inf1.xlarge + # - inf1.2xlarge + # - inf1.6xlarge + # - inf1.24xlarge + # - inf2.xlarge + # - inf2.8xlarge + # - inf2.24xlarge + # - inf2.48xlarge + # - trn1.2xlarge + # - trn1.32xlarge + # - trn1n.32xlarge + - matchExpressions: + - key: "node.kubernetes.io/instance-type" + operator: In + values: + - inf1.xlarge + - inf1.2xlarge + - inf1.6xlarge + - inf1.24xlarge + - inf2.xlarge + - inf2.8xlarge + - inf2.24xlarge + - inf2.48xlarge + - trn1.2xlarge + - trn1.32xlarge + - trn1n.32xlarge + containers: + # Find all neuron-device-plugin images at https://gallery.ecr.aws/neuron/neuron-device-plugin + - image: public.ecr.aws/neuron/neuron-device-plugin:2.19.16.0 + imagePullPolicy: Always + name: neuron-device-plugin + env: + - name: KUBECONFIG + value: /etc/kubernetes/kubelet.conf + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + - name: infa-map + mountPath: /run + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + - name: infa-map + hostPath: + path: /run + + + diff --git a/e2e2/test/cases/neuron/manifests/single-node-test-neuronx.yaml b/e2e2/test/cases/neuron/manifests/single-node-test-neuronx.yaml new file mode 100644 index 000000000..bf1d670e3 --- /dev/null +++ b/e2e2/test/cases/neuron/manifests/single-node-test-neuronx.yaml @@ -0,0 +1,30 @@ +kind: Job +apiVersion: batch/v1 +metadata: + name: neuronx-single-node + labels: + app: neuronx-single-node +spec: + template: + metadata: + labels: + app: neuronx-single-node + spec: + containers: + - name: neuronx-single-node-test + image: {{.NeuronTestImage}} + command: + - /bin/bash + - ./tests/singleNodeTest.sh + imagePullPolicy: Always + resources: + limits: + cpu: "4" + memory: 4Gi + aws.amazon.com/neuron: "1" + requests: + cpu: "1" + memory: 1Gi + aws.amazon.com/neuron: "1" + restartPolicy: Never + backoffLimit: 4 diff --git a/e2e2/test/cases/neuron/neuron_test.go b/e2e2/test/cases/neuron/neuron_test.go new file mode 100644 index 000000000..7a86b15b5 --- /dev/null +++ b/e2e2/test/cases/neuron/neuron_test.go @@ -0,0 +1,71 @@ +package neuron + +import ( + "context" + _ "embed" + "fmt" + "testing" + "time" + + fwext "github.com/aws/aws-k8s-tester/e2e2/internal/framework_extensions" + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" + + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + //go:embed manifests/single-node-test-neuronx.yaml + neuronSingleNodeManifest []byte + renderedNeuronSingleNodeManifest []byte +) + +type neuronSingleNodeManifestTplVars struct { + NeuronTestImage string +} + +func TestMPIJobPytorchTraining(t *testing.T) { + singleNode := features.New("single-node"). + WithLabel("suite", "neuron"). + WithLabel("hardware", "gpu"). + Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + if *neuronTestImage == "" { + t.Fatal(fmt.Errorf("neuronTestImage must be set to run neuron single node test, use https://github.com/aws/aws-k8s-tester/blob/main/e2e2/test/images/Dockerfile.neuronx-tests to build the image and -neuronTestImage to set the image url")) + } + var err error + renderedNeuronSingleNodeManifest, err = fwext.RenderManifests(neuronSingleNodeManifest, neuronSingleNodeManifestTplVars{ + NeuronTestImage: *neuronTestImage, + }) + if err != nil { + t.Fatal(err) + } + err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedNeuronSingleNodeManifest) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Assess("Single node test Job succeeds", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "neuronx-single-node", Namespace: "default"}, + } + err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job), + wait.WithTimeout(time.Minute*20)) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + err := fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedNeuronSingleNodeManifest) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Feature() + + testenv.Test(t, singleNode) +} diff --git a/e2e2/test/images/neuron/Dockerfile b/e2e2/test/images/neuron/Dockerfile new file mode 100644 index 000000000..93e5e980d --- /dev/null +++ b/e2e2/test/images/neuron/Dockerfile @@ -0,0 +1,179 @@ +FROM public.ecr.aws/docker/library/ubuntu:20.04 + +# Neuron SDK components version numbers +ARG NEURONX_DISTRIBUTED_VERSION=0.7.0 +ARG NEURONX_CC_VERSION=2.13.72.0 +ARG NEURONX_FRAMEWORK_VERSION=2.1.2.2.1.0 +ARG NEURONX_COLLECTIVES_LIB_VERSION=2.20.22.0-c101c322e +ARG NEURONX_RUNTIME_LIB_VERSION=2.20.22.0-1b3ca6425 +ARG NEURONX_TOOLS_VERSION=2.17.1.0 + +ARG PYTHON=python3.10 +ARG PYTHON_VERSION=3.10.12 +ARG PIP=pip3 +ARG OMPI_VERSION=4.1.5 + +# This arg required to stop docker build waiting for region configuration while installing tz data from ubuntu 20 +ARG DEBIAN_FRONTEND=noninteractive + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONIOENCODING=UTF-8 +ENV LANG=C.UTF-8 +ENV LC_ALL=C.UTF-8 +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/aws/neuron/lib" +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/amazon/efa/lib" +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/amazon/efa/lib64" +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/amazon/openmpi/lib64" +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib" +ENV PATH /opt/aws/neuron/bin/:$PATH +# ENV SAGEMAKER_TRAINING_MODULE=sagemaker_pytorch_container.training:main +ENV DGLBACKEND=pytorch + +RUN apt-get update \ + && apt-get upgrade -y \ + && apt-get install -y --no-install-recommends \ + build-essential \ + ca-certificates \ + cmake \ + curl \ + emacs \ + git \ + jq \ + libopencv-dev \ + software-properties-common \ + wget \ + unzip \ + vim \ + zlib1g-dev \ + openssl \ + libssl-dev \ + libsqlite3-dev \ + libgdbm-dev \ + libc6-dev \ + libbz2-dev \ + libncurses-dev \ + tk-dev \ + libffi-dev \ + libcap-dev \ + gnupg2 \ + gpg-agent \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +RUN echo "deb https://apt.repos.neuron.amazonaws.com focal main" > /etc/apt/sources.list.d/neuron.list +RUN wget -qO - https://apt.repos.neuron.amazonaws.com/GPG-PUB-KEY-AMAZON-AWS-NEURON.PUB | apt-key add - + +RUN apt-get update \ + && apt-get install -y \ + aws-neuronx-tools=$NEURONX_TOOLS_VERSION \ + aws-neuronx-collectives=$NEURONX_COLLECTIVES_LIB_VERSION \ + aws-neuronx-runtime-lib=$NEURONX_RUNTIME_LIB_VERSION \ + && rm -rf /var/lib/apt/lists/* \ + && rm -rf /tmp/tmp* \ + && apt-get clean + +# Install Open MPI +RUN mkdir -p /tmp/openmpi \ + && cd /tmp/openmpi \ + && wget --quiet https://download.open-mpi.org/release/open-mpi/v4.1/openmpi-${OMPI_VERSION}.tar.gz \ + && tar zxf openmpi-${OMPI_VERSION}.tar.gz \ + && cd openmpi-${OMPI_VERSION} \ + && ./configure --enable-orterun-prefix-by-default \ + && make -j $(nproc) all \ + && make install \ + && ldconfig \ + && rm -rf /tmp/openmpi + +# install Python +RUN wget -q https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz \ + && tar -xzf Python-$PYTHON_VERSION.tgz \ + && cd Python-$PYTHON_VERSION \ + && ./configure --enable-shared --prefix=/usr/local \ + && make -j $(nproc) && make install \ + && cd .. && rm -rf ../Python-$PYTHON_VERSION* \ + && ln -s /usr/local/bin/pip3 /usr/bin/pip \ + && ln -s /usr/local/bin/$PYTHON /usr/local/bin/python \ + && ${PIP} --no-cache-dir install --upgrade \ + pip \ + setuptools + +WORKDIR / + +# The ENV variables declared below are changed in the previous section +# Grouping these ENV variables in the first section causes +# ompi_info to fail. This is only observed in CPU containers +ENV PATH="$PATH:/home/.openmpi/bin" +ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/home/.openmpi/lib/" +RUN ompi_info --parsable --all | grep mpi_built_with_cuda_support:value + +RUN ${PIP} install --no-cache-dir -U \ + "bokeh>=2.3,<3" \ + "awscli<2" \ + scipy \ + click \ + "cryptography" \ + psutil==5.6.7 \ + dataset \ + transformers==4.36.2 \ + Pillow + +RUN mkdir -p /etc/pki/tls/certs && cp /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt +RUN ${PIP} config set global.extra-index-url https://pip.repos.neuron.amazonaws.com \ + && ${PIP} install --force-reinstall torch-neuronx==$NEURONX_FRAMEWORK_VERSION --extra-index-url https://pip.repos.neuron.amazonaws.com \ + && ${PIP} install --force-reinstall neuronx-cc==$NEURONX_CC_VERSION --extra-index-url https://pip.repos.neuron.amazonaws.com \ + && ${PIP} install --force-reinstall --no-deps neuronx_distributed==$NEURONX_DISTRIBUTED_VERSION --extra-index-url https://pip.repos.neuron.amazonaws.com + +# attrs, neuronx-cc required: >=19.2.0, sagemaker <24,>=23.1.0 +# protobuf neuronx-cc<4, sagemaker-training >=3.9.2,<=3.20.3 +# awscli 1.25.47 has requirement docutils<0.17,>=0.10 +# etcd for kubernetes installation +# awscli 1.27.127 has requirement rsa<4.8,>=3.1.2, but you have rsa 4.9. +# awscli 1.27.127 requires urllib3 < 1.27, python-etcd requires urllib3 >= 1.7, latest urllib3 release is 2.0.2 +RUN ${PIP} install --no-cache-dir -U \ + "attrs<24,>=23.1.0" \ + "protobuf>=3.18.3,<=3.20.3" \ + "docutils>=0.10,<0.17" \ + "rsa<4.8,>=3.1.2" \ + "urllib3>=1.26.0,<1.27" + +# EFA Installer does apt get. Make sure to run apt update before that +RUN apt-get update +RUN cd $HOME \ + && curl -O https://efa-installer.amazonaws.com/aws-efa-installer-latest.tar.gz \ + && wget https://efa-installer.amazonaws.com/aws-efa-installer.key && gpg --import aws-efa-installer.key \ + && cat aws-efa-installer.key | gpg --fingerprint \ + && wget https://efa-installer.amazonaws.com/aws-efa-installer-latest.tar.gz.sig && gpg --verify ./aws-efa-installer-latest.tar.gz.sig \ + && tar -xf aws-efa-installer-latest.tar.gz \ + && cd aws-efa-installer \ + && ./efa_installer.sh -y -g -d --skip-kmod --skip-limit-conf --no-verify \ + && cd $HOME + + +# Clean up after apt update +RUN rm -rf /var/lib/apt/lists/* \ + && rm -rf /tmp/tmp* \ + && apt-get clean + +# Install some common packages used by training scripts +# torchvision needed for MLP. since it depends on torch and torch neuron/torch +# is already installed install it with nodeps +RUN pip3 install --no-cache-dir --no-deps -U \ + torchvision==0.16.* + + +RUN HOME_DIR=/root \ + && curl -o ${HOME_DIR}/oss_compliance.zip https://aws-dlinfra-utilities.s3.amazonaws.com/oss_compliance.zip \ + && unzip ${HOME_DIR}/oss_compliance.zip -d ${HOME_DIR}/ \ + && cp ${HOME_DIR}/oss_compliance/test/testOSSCompliance /usr/local/bin/testOSSCompliance \ + && chmod +x /usr/local/bin/testOSSCompliance \ + && chmod +x ${HOME_DIR}/oss_compliance/generate_oss_compliance.sh \ + && ${HOME_DIR}/oss_compliance/generate_oss_compliance.sh ${HOME_DIR} ${PYTHON} \ + && rm -rf ${HOME_DIR}/oss_compliance* \ + && rm -rf /tmp/tmp* + +RUN curl -o /license.txt https://aws-dlc-licenses.s3.amazonaws.com/pytorch-2.1/license.txt + +COPY e2e2/test/images/neuron/tests ./tests \ No newline at end of file diff --git a/e2e2/test/images/neuron/tests/singleNodeTest.sh b/e2e2/test/images/neuron/tests/singleNodeTest.sh new file mode 100755 index 000000000..9828efa74 --- /dev/null +++ b/e2e2/test/images/neuron/tests/singleNodeTest.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +torchrun --nproc_per_node=2 --nnodes=1 tests/testNeuronSingleAllReduce.py +torchrun --nproc_per_node=2 --nnodes=1 tests/testNeuronParallelState.py +torchrun --nproc_per_node=2 --nnodes=1 tests/testNeuronMlp.py \ No newline at end of file diff --git a/e2e2/test/images/neuron/tests/testNeuronMlp.py b/e2e2/test/images/neuron/tests/testNeuronMlp.py new file mode 100644 index 000000000..877cb9936 --- /dev/null +++ b/e2e2/test/images/neuron/tests/testNeuronMlp.py @@ -0,0 +1,104 @@ +# Source: https://github.com/aws/deep-learning-containers/blob/master/test/dlc_tests/container_tests/bin/pytorch_tests/testNeuronMlp +import os +import time +import torch + +from torchvision.datasets import mnist +from torch.utils.data import DataLoader +from torchvision.transforms import ToTensor + +# XLA imports +import torch_xla.core.xla_model as xm + +# XLA imports for parallel loader and multi-processing +import torch_xla.distributed.parallel_loader as pl +from torch.utils.data.distributed import DistributedSampler + +# Initialize XLA process group for torchrun +import torch_xla.distributed.xla_backend +import torch.nn as nn +import torch.nn.functional as F + +torch.distributed.init_process_group('xla') + +# Global constants +EPOCHS = 4 +WARMUP_STEPS = 2 +BATCH_SIZE = 32 + +# Load MNIST train dataset +train_dataset = mnist.MNIST(root=os.path.join('./MNIST_DATA_train', str(xm.get_ordinal())), + train=True, download=True, transform=ToTensor()) + +# Declare 3-layer MLP for MNIST dataset +class MLP(nn.Module): + def __init__(self, input_size = 28 * 28, output_size = 10, layers = [120, 84]): + super(MLP, self).__init__() + self.fc1 = nn.Linear(input_size, layers[0]) + self.fc2 = nn.Linear(layers[0], layers[1]) + self.fc3 = nn.Linear(layers[1], output_size) + + def forward(self, x): + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return F.log_softmax(x, dim=1) + + +def main(): + # XLA MP: get world size + world_size = xm.xrt_world_size() + # multi-processing: ensure each worker has same initial weights + torch.manual_seed(0) + + # Move model to device and declare optimizer and loss function + device = 'xla' + model = MLP().to(device) + # For multiprocessing, scale up learning rate + optimizer = torch.optim.SGD(model.parameters(), lr=0.01 * world_size) + loss_fn = torch.nn.NLLLoss() + + # Prepare data loader + train_sampler = None + if world_size > 1: + train_sampler = DistributedSampler(train_dataset, + num_replicas=world_size, + rank=xm.get_ordinal(), + shuffle=True) + train_loader = DataLoader(train_dataset, + batch_size=BATCH_SIZE, + sampler=train_sampler, + shuffle=False if train_sampler else True) + # XLA MP: use MpDeviceLoader from torch_xla.distributed + train_device_loader = pl.MpDeviceLoader(train_loader, device) + + # Run the training loop + print('----------Training ---------------') + model.train() + for epoch in range(EPOCHS): + start = time.time() + for idx, (train_x, train_label) in enumerate(train_device_loader): + optimizer.zero_grad() + train_x = train_x.view(train_x.size(0), -1) + output = model(train_x) + loss = loss_fn(output, train_label) + loss.backward() + xm.optimizer_step(optimizer) # XLA MP: performs grad allreduce and optimizer step + if idx < WARMUP_STEPS: # skip warmup iterations + start = time.time() + + # Compute statistics for the last epoch + interval = idx - WARMUP_STEPS # skip warmup iterations + throughput = interval / (time.time() - start) + print("Train throughput (iter/sec): {}".format(throughput)) + print("Final loss is {:0.4f}".format(loss.detach().to('cpu'))) + + # Save checkpoint for evaluation (xm.save ensures only one process save) + os.makedirs("checkpoints", exist_ok=True) + checkpoint = {'state_dict': model.state_dict()} + xm.save(checkpoint,'checkpoints/checkpoint.pt') + + print('----------End Training ---------------') + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/e2e2/test/images/neuron/tests/testNeuronParallelState.py b/e2e2/test/images/neuron/tests/testNeuronParallelState.py new file mode 100644 index 000000000..adc90bfae --- /dev/null +++ b/e2e2/test/images/neuron/tests/testNeuronParallelState.py @@ -0,0 +1,106 @@ +# Source: https://github.com/aws/deep-learning-containers/blob/master/test/dlc_tests/container_tests/bin/pytorch_tests/testNeuronParallelState +import argparse +import atexit +import json +import os +import traceback +from datetime import datetime + +import torch +import torch_xla.core.xla_model as xm +import torch_xla.debug.metrics as met + +from neuronx_distributed.parallel_layers import parallel_state +from neuronx_distributed.parallel_layers.utils import is_pjrt_device + +datetime_str = str(datetime.now()) + + +results = {"inference_success": 1} + + +def test_initialize_model_parallel(tensor_model_parallel_size): + def _test_initialize_model_parallel(): + if torch.distributed.get_rank() == 0: + print("testing initialize_model_parallel with size {}".format(tensor_model_parallel_size)) + tensor_model_parallel_size_ = min(tensor_model_parallel_size, torch.distributed.get_world_size()) + assert not parallel_state.model_parallel_is_initialized() + parallel_state.initialize_model_parallel(tensor_model_parallel_size=tensor_model_parallel_size_) + assert parallel_state.model_parallel_is_initialized() + + # Checks. + def check(group, world_size, rank): + assert world_size == torch.distributed.get_world_size(group=group) + assert rank == torch.distributed.get_rank(group=group) + + # Model parallel. + world_size = tensor_model_parallel_size_ + rank = torch.distributed.get_rank() % tensor_model_parallel_size_ + assert world_size == parallel_state.get_tensor_model_parallel_size() + assert rank == parallel_state.get_tensor_model_parallel_rank() + check(parallel_state.get_tensor_model_parallel_group(), world_size, rank) + + # Data parallel. + world_size = torch.distributed.get_world_size() // tensor_model_parallel_size_ + rank = torch.distributed.get_rank() // tensor_model_parallel_size + assert world_size == parallel_state.get_data_parallel_size() + assert rank == parallel_state.get_data_parallel_rank() + check(parallel_state.get_data_parallel_group(), world_size, rank) + + # Reset groups + parallel_state.destroy_model_parallel() + + torch.distributed.barrier() + if torch.distributed.get_rank() == 0: + print("test passed") + + global results + try: + _test_initialize_model_parallel() + except: + results["inference_success"] = 0 + print(traceback.format_exc()) + raise + + +def test_get_tensor_model_parallel_src_rank(tensor_model_parallel_size_): + def _test_get_tensor_model_parallel_src_rank(): + if torch.distributed.get_rank() == 0: + print("testing get_tensor_model_parallel_src_rank with size {}".format(tensor_model_parallel_size_)) + tensor_model_parallel_size = min(tensor_model_parallel_size_, torch.distributed.get_world_size()) + assert not parallel_state.model_parallel_is_initialized() + parallel_state.initialize_model_parallel(tensor_model_parallel_size) + assert parallel_state.model_parallel_is_initialized() + + # Checks + src_rank = torch.distributed.get_rank() - parallel_state.get_tensor_model_parallel_rank() + assert parallel_state.get_tensor_model_parallel_src_rank() == src_rank + + # Reset groups + parallel_state.destroy_model_parallel() + + torch.distributed.barrier() + if torch.distributed.get_rank() == 0: + print("test passed") + + global results + try: + _test_get_tensor_model_parallel_src_rank() + except: + results["inference_success"] = 0 + print(traceback.format_exc()) + raise + + +if __name__ == "__main__": + if is_pjrt_device(): + import torch_xla.experimental.pjrt_backend + torch.distributed.init_process_group("xla", init_method="pjrt://") + else: + torch.distributed.init_process_group("xla") + world_size = xm.xrt_world_size() + tensor_model_parallel_size = 1 + while tensor_model_parallel_size <= world_size: + test_initialize_model_parallel(tensor_model_parallel_size) + test_get_tensor_model_parallel_src_rank(tensor_model_parallel_size) + tensor_model_parallel_size *= 2 \ No newline at end of file diff --git a/e2e2/test/images/neuron/tests/testNeuronSingleAllReduce.py b/e2e2/test/images/neuron/tests/testNeuronSingleAllReduce.py new file mode 100644 index 000000000..3cc3b1a2b --- /dev/null +++ b/e2e2/test/images/neuron/tests/testNeuronSingleAllReduce.py @@ -0,0 +1,30 @@ +# Source: https://github.com/aws/deep-learning-containers/blob/master/test/dlc_tests/container_tests/bin/pytorch_tests/testNeuronSingleAllReduce +import os +import torch_xla.core.xla_model as xm +import torch +import torch_xla.distributed.xla_backend +torch.distributed.init_process_group('xla') +import torch_xla.distributed.xla_multiprocessing as xmp +os.environ["NEURON_RT_EXEC_TIMEOUT"] = "20" +os.environ["NCCL_DEBUG"] = "WARN" +os.environ["NCCL_DEBUG_SUBSYS"] = "ALL" +def _mp_fn(): + world_size = xm.xrt_world_size() + device = xm.xla_device() + rank = xm.get_ordinal() + ones = torch.ones((2, 3)) + xones = ones.to(device) + if world_size > 0: + print("running all reduce") + for i in range(0, 5): + print(f'at iteration {i}, with local rank {rank}', flush=True) + result = xm.all_reduce(xm.REDUCE_SUM, xones) + result_cpu = result.cpu() + #xm.mark_step() + print(result_cpu, flush = True) + expected = torch.ones((2,3))*world_size + assert expected.allclose(result_cpu) + print('PASS') +if __name__ == '__main__': + _mp_fn() + #xmp.spawn(_mp_fn, args=(),nprocs=2, join=True) \ No newline at end of file