Skip to content

Commit

Permalink
Add hpc benckmark to unit test, and add "reserved-capacity" flag to d…
Browse files Browse the repository at this point in the history
…eployer.
  • Loading branch information
weicongw committed Aug 27, 2024
1 parent 6ad7172 commit c03d0ed
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 4 deletions.
54 changes: 54 additions & 0 deletions e2e2/test/cases/nvidia/manifests/job-hpc-benchmarks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
kind: Job
apiVersion: batch/v1
metadata:
name: hpc-benckmarks-job
labels:
app: hpc-benckmarks-job
spec:
completions: 20
parallelism: 1
template:
metadata:
labels:
app: hpc-benckmarks-job
spec:
volumes:
- name: dshm
emptyDir:
medium: Memory
containers:
- name: hpc-benchmarks
image: "nvcr.io/nvidia/hpc-benchmarks:24.06"
command:
- mpirun
- --allow-run-as-root
- -np
- "{{.GpuPerNode}}"
- -bind-to
- none
- -x
- NCCL_DEBUG=INFO
- -x
- HPL_FCT_COMM_POLICY=1
- -x
- HPL_USE_NVSHMEM=0
- hpl.sh
- --mem-affinity
- 0:0:0:0:1:1:1:1
- --cpu-affinity
- 0-13:14-27:28-41:42-55:56-69:70-83:84-97:98-111
- --no-multinode
- --dat
- hpl-linux-x86_64/sample-dat/HPL-dgx-1N.dat
volumeMounts:
- mountPath: /dev/shm
name: dshm
imagePullPolicy: Always
resources:
limits:
nvidia.com/gpu: {{.GpuPerNode}}
env:
- name: UCX_TLS
value: "^sysv"
restartPolicy: Never
backoffLimit: 4
4 changes: 2 additions & 2 deletions e2e2/test/cases/nvidia/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestMPIJobPytorchTraining(t *testing.T) {
}
renderedMpiJobNcclTestMultiNodeManifest, err := fwext.RenderManifests(mpiJobNcclTestMultiNodeManifest, ncclTestManifestTplVars{
// one of the nodes will be used for the master pod
WorkerNodeCount: nodeCount - 1,
WorkerNodeGpuCount: (nodeCount - 1) * gpuPerNode,
WorkerNodeCount: nodeCount,
WorkerNodeGpuCount: nodeCount * gpuPerNode,
GpuPerNode: gpuPerNode,
NvidiaTestImage: *nvidiaTestImage,
EfaInterfacePerNode: efaPerNode,
Expand Down
54 changes: 53 additions & 1 deletion e2e2/test/cases/nvidia/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ var (
//go:embed manifests/job-unit-test-single-node.yaml
jobUnitTestSingleNodeManifest []byte
renderedJobUnitTestSingleNodeManifest []byte
//go:embed manifests/job-hpc-benchmarks.yaml
jobHpcBenchmarksSingleNodeManifest []byte
renderedJobHpcBenchmarksSingleNodeManifest []byte
)

type unitTestManifestTplVars struct {
NvidiaTestImage string
}

type hpcTestManifestTplVars struct {
GpuPerNode int
}

func TestSingleNodeUnitTest(t *testing.T) {
unitTest := features.New("unit-test").
WithLabel("suite", "nvidia").
Expand Down Expand Up @@ -74,5 +81,50 @@ func TestSingleNodeUnitTest(t *testing.T) {
}).
Feature()

testenv.Test(t, unitTest)
hpcTest := features.New("hpc-benckmarks").
WithLabel("suite", "nvidia").
WithLabel("hardware", "gpu").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
var err error
renderedJobHpcBenchmarksSingleNodeManifest, err = fwext.RenderManifests(jobHpcBenchmarksSingleNodeManifest, hpcTestManifestTplVars{
GpuPerNode: gpuPerNode,
})
if err != nil {
t.Fatal(err)
}
err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest)
if err != nil {
t.Fatal(err)
}
return ctx
}).
Assess("HPC test Job succeeds", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"},
}
err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job),
wait.WithContext(ctx))
if err != nil {
t.Fatal(err)
}
return ctx
}).
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
log, err := fwext.GetJobLogs(cfg.Client().RESTConfig(), &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"},
})
if err != nil {
t.Fatal(err)
}
t.Log("Test log for hpc-benckmarks-job:")
t.Log(log)
err = fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest)
if err != nil {
t.Fatal(err)
}
return ctx
}).
Feature()

testenv.Test(t, unitTest, hpcTest)
}
1 change: 1 addition & 0 deletions kubetest2/internal/deployers/eksapi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type deployerOptions struct {
Addons []string `flag:"addons" desc:"Managed addons (name:version pairs) to create in the cluster. Use 'latest' for the most recent version, or 'default' for the default version."`
AMI string `flag:"ami" desc:"AMI for unmanaged nodes"`
AMIType string `flag:"ami-type" desc:"AMI type for managed nodes"`
CapacityReservation bool `flag:"capacity-reservation" desc:"Use capacity reservation for the unmanaged nodegroup"`
ClusterRoleServicePrincipal string `flag:"cluster-role-service-principal" desc:"Additional service principal that can assume the cluster role"`
EFA bool `flag:"efa" desc:"Create EFA interfaces on the node of an unmanaged nodegroup. Requires --unmanaged-nodes."`
EKSEndpointURL string `flag:"endpoint-url" desc:"Endpoint URL for the EKS API"`
Expand Down
68 changes: 67 additions & 1 deletion kubetest2/internal/deployers/eksapi/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
if err != nil {
return err
}
var subnetId, capacityReservationId string
if opts.CapacityReservation {
subnetId, capacityReservationId, err = m.getSubnetWithCapacity(infra, opts)
if err != nil {
return err
}
} else {
subnetId = infra.subnetsPrivate[0]
}

// pull the role name out of the ARN
nodeRoleArnParts := strings.Split(infra.nodeRole, "/")
nodeRoleName := nodeRoleArnParts[len(nodeRoleArnParts)-1]
Expand All @@ -267,7 +277,7 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
},
{
ParameterKey: aws.String("SubnetIds"),
ParameterValue: aws.String(infra.subnetsPrivate[0]), // this is load bearing! EFA requires a private subnet
ParameterValue: aws.String(subnetId), // this is load bearing! EFA requires a private subnet
},
{
ParameterKey: aws.String("UserData"),
Expand Down Expand Up @@ -301,6 +311,10 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
ParameterKey: aws.String("InstanceType"),
ParameterValue: aws.String(opts.InstanceTypes[0]),
},
{
ParameterKey: aws.String("CapacityReservationId"),
ParameterValue: aws.String(capacityReservationId),
},
},
}
out, err := m.clients.CFN().CreateStack(context.TODO(), &input)
Expand Down Expand Up @@ -432,3 +446,55 @@ func (m *NodegroupManager) verifyASGAMI(asgName string, amiId string) (bool, err
klog.Infof("ASG instances are using expected AMI: %s", amiId)
return true, nil
}

func (m *NodegroupManager) getSubnetWithCapacity(infra *Infrastructure, opts *deployerOptions) (string, string, error) {
var capacityReservationId string
capacityReservations, err := m.clients.EC2().DescribeCapacityReservations(context.TODO(), &ec2.DescribeCapacityReservationsInput{
Filters: []ec2types.Filter{
{
Name: aws.String("instance-type"),
Values: opts.InstanceTypes,
},
{
Name: aws.String("state"),
Values: []string{"active"},
},
},
})
if err != nil {
return "", "", fmt.Errorf("failed to describe capacity reservation")
}
var az string
for _, cr := range capacityReservations.CapacityReservations {
if *cr.AvailableInstanceCount >= int32(opts.Nodes) {
capacityReservationId = *cr.CapacityReservationId
az = *cr.AvailabilityZone
break
}
}
if capacityReservationId == "" {
return "", "", fmt.Errorf("no capacity reservation found for instance type %s with %d nodes count", opts.InstanceTypes[0], opts.Nodes)
}
klog.Infof("Using capacity reservation: %s", capacityReservationId)
subnet, err := m.clients.EC2().DescribeSubnets(context.TODO(), &ec2.DescribeSubnetsInput{
Filters: []ec2types.Filter{
{
Name: aws.String("availability-zone"),
Values: []string{az},
},
{
Name: aws.String("subnet-id"),
Values: infra.subnetsPrivate,
},
},
})
if err != nil {
return "", "", fmt.Errorf("failed to describe subnet")
}
if subnet == nil || len(subnet.Subnets) == 0 {
return "", "", fmt.Errorf("no subnet found for availability zone %s", az)
}
subnetId := *subnet.Subnets[0].SubnetId
klog.Infof("Using subnet: %s", subnetId)
return subnetId, capacityReservationId, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ Parameters:

UserData:
Type: String

CapacityReservationId:
Type: String
Description: Capacity reservation id for the unmanaged nodegroup

InstanceType:
Type: String
Expand All @@ -51,6 +55,8 @@ Conditions:
!Equals [Ref: InstanceType, p4d.24xlarge]
IsP5Node:
!Equals [Ref: InstanceType, p5.48xlarge]
IsCapacityReservationIdSet:
!Not [!Equals [!Ref CapacityReservationId, ""]]

Resources:
EFASecurityGroup:
Expand Down Expand Up @@ -201,6 +207,12 @@ Resources:
DeleteOnTermination: true
VolumeSize: !Ref NodeDiskSize
VolumeType: gp2
CapacityReservationSpecification:
!If
- IsCapacityReservationIdSet
- CapacityReservationTarget:
CapacityReservationId: !Ref CapacityReservationId
- !Ref AWS::NoValue
IamInstanceProfile:
Arn: !GetAtt NodeInstanceProfile.Arn
ImageId: !Ref AMIId
Expand Down

0 comments on commit c03d0ed

Please sign in to comment.