diff --git a/e2e2/test/cases/nvidia/manifests/job-hpc-benchmarks.yaml b/e2e2/test/cases/nvidia/manifests/job-hpc-benchmarks.yaml new file mode 100644 index 000000000..ddf285131 --- /dev/null +++ b/e2e2/test/cases/nvidia/manifests/job-hpc-benchmarks.yaml @@ -0,0 +1,54 @@ +kind: Job +apiVersion: batch/v1 +metadata: + name: hpc-benckmarks-job + labels: + app: hpc-benckmarks-job +spec: + completions: 20 + parallelism: 1 + template: + metadata: + labels: + app: hpc-benckmarks-job + spec: + volumes: + - name: dshm + emptyDir: + medium: Memory + containers: + - name: hpc-benchmarks + image: "nvcr.io/nvidia/hpc-benchmarks:24.06" + command: + - mpirun + - --allow-run-as-root + - -np + - "{{.GpuPerNode}}" + - -bind-to + - none + - -x + - NCCL_DEBUG=INFO + - -x + - HPL_FCT_COMM_POLICY=1 + - -x + - HPL_USE_NVSHMEM=0 + - hpl.sh + - --mem-affinity + - 0:0:0:0:1:1:1:1 + - --cpu-affinity + - 0-13:14-27:28-41:42-55:56-69:70-83:84-97:98-111 + - --no-multinode + - --dat + - hpl-linux-x86_64/sample-dat/HPL-dgx-1N.dat + volumeMounts: + - mountPath: /dev/shm + name: dshm + imagePullPolicy: Always + resources: + limits: + nvidia.com/gpu: {{.GpuPerNode}} + env: + - name: UCX_TLS + value: "^sysv" + restartPolicy: Never + backoffLimit: 4 diff --git a/e2e2/test/cases/nvidia/mpi_test.go b/e2e2/test/cases/nvidia/mpi_test.go index d697e47a8..7c40f302c 100644 --- a/e2e2/test/cases/nvidia/mpi_test.go +++ b/e2e2/test/cases/nvidia/mpi_test.go @@ -93,8 +93,8 @@ func TestMPIJobPytorchTraining(t *testing.T) { } renderedMpiJobNcclTestMultiNodeManifest, err := fwext.RenderManifests(mpiJobNcclTestMultiNodeManifest, ncclTestManifestTplVars{ // one of the nodes will be used for the master pod - WorkerNodeCount: nodeCount - 1, - WorkerNodeGpuCount: (nodeCount - 1) * gpuPerNode, + WorkerNodeCount: nodeCount, + WorkerNodeGpuCount: nodeCount * gpuPerNode, GpuPerNode: gpuPerNode, NvidiaTestImage: *nvidiaTestImage, EfaInterfacePerNode: efaPerNode, diff --git a/e2e2/test/cases/nvidia/unit_test.go b/e2e2/test/cases/nvidia/unit_test.go index 235cae31f..27704ac05 100644 --- a/e2e2/test/cases/nvidia/unit_test.go +++ b/e2e2/test/cases/nvidia/unit_test.go @@ -19,12 +19,19 @@ var ( //go:embed manifests/job-unit-test-single-node.yaml jobUnitTestSingleNodeManifest []byte renderedJobUnitTestSingleNodeManifest []byte + //go:embed manifests/job-hpc-benchmarks.yaml + jobHpcBenchmarksSingleNodeManifest []byte + renderedJobHpcBenchmarksSingleNodeManifest []byte ) type unitTestManifestTplVars struct { NvidiaTestImage string } +type hpcTestManifestTplVars struct { + GpuPerNode int +} + func TestSingleNodeUnitTest(t *testing.T) { unitTest := features.New("unit-test"). WithLabel("suite", "nvidia"). @@ -74,5 +81,50 @@ func TestSingleNodeUnitTest(t *testing.T) { }). Feature() - testenv.Test(t, unitTest) + hpcTest := features.New("hpc-benckmarks"). + WithLabel("suite", "nvidia"). + WithLabel("hardware", "gpu"). + Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + var err error + renderedJobHpcBenchmarksSingleNodeManifest, err = fwext.RenderManifests(jobHpcBenchmarksSingleNodeManifest, hpcTestManifestTplVars{ + GpuPerNode: gpuPerNode, + }) + if err != nil { + t.Fatal(err) + } + err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Assess("HPC test Job succeeds", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"}, + } + err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job), + wait.WithContext(ctx)) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + log, err := fwext.GetJobLogs(cfg.Client().RESTConfig(), &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"}, + }) + if err != nil { + t.Fatal(err) + } + t.Log("Test log for hpc-benckmarks-job:") + t.Log(log) + err = fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest) + if err != nil { + t.Fatal(err) + } + return ctx + }). + Feature() + + testenv.Test(t, unitTest, hpcTest) } diff --git a/kubetest2/internal/deployers/eksapi/deployer.go b/kubetest2/internal/deployers/eksapi/deployer.go index fe52587d3..cc46021b7 100644 --- a/kubetest2/internal/deployers/eksapi/deployer.go +++ b/kubetest2/internal/deployers/eksapi/deployer.go @@ -54,6 +54,7 @@ type deployerOptions struct { Addons []string `flag:"addons" desc:"Managed addons (name:version pairs) to create in the cluster. Use 'latest' for the most recent version, or 'default' for the default version."` AMI string `flag:"ami" desc:"AMI for unmanaged nodes"` AMIType string `flag:"ami-type" desc:"AMI type for managed nodes"` + CapacityReservation bool `flag:"capacity-reservation" desc:"Use capacity reservation for the unmanaged nodegroup"` ClusterRoleServicePrincipal string `flag:"cluster-role-service-principal" desc:"Additional service principal that can assume the cluster role"` EFA bool `flag:"efa" desc:"Create EFA interfaces on the node of an unmanaged nodegroup. Requires --unmanaged-nodes."` EKSEndpointURL string `flag:"endpoint-url" desc:"Endpoint URL for the EKS API"` diff --git a/kubetest2/internal/deployers/eksapi/nodegroup.go b/kubetest2/internal/deployers/eksapi/nodegroup.go index 55e73c69c..61d641921 100644 --- a/kubetest2/internal/deployers/eksapi/nodegroup.go +++ b/kubetest2/internal/deployers/eksapi/nodegroup.go @@ -249,6 +249,16 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure if err != nil { return err } + var subnetId, capacityReservationId string + if opts.CapacityReservation { + subnetId, capacityReservationId, err = m.getSubnetWithCapacity(infra, opts) + if err != nil { + return err + } + } else { + subnetId = infra.subnetsPrivate[0] + } + // pull the role name out of the ARN nodeRoleArnParts := strings.Split(infra.nodeRole, "/") nodeRoleName := nodeRoleArnParts[len(nodeRoleArnParts)-1] @@ -267,7 +277,7 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure }, { ParameterKey: aws.String("SubnetIds"), - ParameterValue: aws.String(infra.subnetsPrivate[0]), // this is load bearing! EFA requires a private subnet + ParameterValue: aws.String(subnetId), // this is load bearing! EFA requires a private subnet }, { ParameterKey: aws.String("UserData"), @@ -305,6 +315,10 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure ParameterKey: aws.String("InstanceType"), ParameterValue: aws.String(opts.InstanceTypes[0]), }, + { + ParameterKey: aws.String("CapacityReservationId"), + ParameterValue: aws.String(capacityReservationId), + }, }, } out, err := m.clients.CFN().CreateStack(context.TODO(), &input) @@ -436,3 +450,55 @@ func (m *NodegroupManager) verifyASGAMI(asgName string, amiId string) (bool, err klog.Infof("ASG instances are using expected AMI: %s", amiId) return true, nil } + +func (m *NodegroupManager) getSubnetWithCapacity(infra *Infrastructure, opts *deployerOptions) (string, string, error) { + var capacityReservationId string + capacityReservations, err := m.clients.EC2().DescribeCapacityReservations(context.TODO(), &ec2.DescribeCapacityReservationsInput{ + Filters: []ec2types.Filter{ + { + Name: aws.String("instance-type"), + Values: opts.InstanceTypes, + }, + { + Name: aws.String("state"), + Values: []string{"active"}, + }, + }, + }) + if err != nil { + return "", "", fmt.Errorf("failed to describe capacity reservation") + } + var az string + for _, cr := range capacityReservations.CapacityReservations { + if *cr.AvailableInstanceCount >= int32(opts.Nodes) { + capacityReservationId = *cr.CapacityReservationId + az = *cr.AvailabilityZone + break + } + } + if capacityReservationId == "" { + return "", "", fmt.Errorf("no capacity reservation found for instance type %s with %d nodes count", opts.InstanceTypes[0], opts.Nodes) + } + klog.Infof("Using capacity reservation: %s", capacityReservationId) + subnet, err := m.clients.EC2().DescribeSubnets(context.TODO(), &ec2.DescribeSubnetsInput{ + Filters: []ec2types.Filter{ + { + Name: aws.String("availability-zone"), + Values: []string{az}, + }, + { + Name: aws.String("subnet-id"), + Values: infra.subnetsPrivate, + }, + }, + }) + if err != nil { + return "", "", fmt.Errorf("failed to describe subnet") + } + if subnet == nil || len(subnet.Subnets) == 0 { + return "", "", fmt.Errorf("no subnet found for availability zone %s", az) + } + subnetId := *subnet.Subnets[0].SubnetId + klog.Infof("Using subnet: %s", subnetId) + return subnetId, capacityReservationId, nil +} diff --git a/kubetest2/internal/deployers/eksapi/templates/unmanaged-nodegroup-efa.yaml b/kubetest2/internal/deployers/eksapi/templates/unmanaged-nodegroup-efa.yaml index bfd5cd382..36e800735 100644 --- a/kubetest2/internal/deployers/eksapi/templates/unmanaged-nodegroup-efa.yaml +++ b/kubetest2/internal/deployers/eksapi/templates/unmanaged-nodegroup-efa.yaml @@ -40,6 +40,10 @@ Parameters: UserData: Type: String + + CapacityReservationId: + Type: String + Description: Capacity reservation id for the unmanaged nodegroup UserDataIsMIMEPart: Description: "User data should be embedded as a part of a multi-part MIME document" @@ -57,6 +61,8 @@ Conditions: !Equals [Ref: InstanceType, p4d.24xlarge] IsP5Node: !Equals [Ref: InstanceType, p5.48xlarge] + IsCapacityReservationIdSet: + !Not [!Equals [!Ref CapacityReservationId, ""]] IsUserDataMIMEPart: !Equals [true, !Ref UserDataIsMIMEPart] @@ -209,6 +215,12 @@ Resources: DeleteOnTermination: true VolumeSize: !Ref NodeDiskSize VolumeType: gp2 + CapacityReservationSpecification: + !If + - IsCapacityReservationIdSet + - CapacityReservationTarget: + CapacityReservationId: !Ref CapacityReservationId + - !Ref AWS::NoValue IamInstanceProfile: Arn: !GetAtt NodeInstanceProfile.Arn ImageId: !Ref AMIId