Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hpc benckmark to unit test, and add "capacity-reservation" flag to deployer #470

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions e2e2/test/cases/nvidia/manifests/job-hpc-benchmarks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
kind: Job
apiVersion: batch/v1
metadata:
name: hpc-benckmarks-job
labels:
app: hpc-benckmarks-job
spec:
completions: 20
parallelism: 1
template:
metadata:
labels:
app: hpc-benckmarks-job
spec:
volumes:
- name: dshm
emptyDir:
medium: Memory
containers:
- name: hpc-benchmarks
image: "nvcr.io/nvidia/hpc-benchmarks:24.06"
command:
- mpirun
- --allow-run-as-root
- -np
- "{{.GpuPerNode}}"
- -bind-to
- none
- -x
- NCCL_DEBUG=INFO
- -x
- HPL_FCT_COMM_POLICY=1
- -x
- HPL_USE_NVSHMEM=0
- hpl.sh
- --mem-affinity
- 0:0:0:0:1:1:1:1
- --cpu-affinity
- 0-13:14-27:28-41:42-55:56-69:70-83:84-97:98-111
- --no-multinode
- --dat
- hpl-linux-x86_64/sample-dat/HPL-dgx-1N.dat
volumeMounts:
- mountPath: /dev/shm
name: dshm
imagePullPolicy: Always
resources:
limits:
nvidia.com/gpu: {{.GpuPerNode}}
env:
- name: UCX_TLS
value: "^sysv"
restartPolicy: Never
backoffLimit: 4
4 changes: 2 additions & 2 deletions e2e2/test/cases/nvidia/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestMPIJobPytorchTraining(t *testing.T) {
}
renderedMpiJobNcclTestMultiNodeManifest, err := fwext.RenderManifests(mpiJobNcclTestMultiNodeManifest, ncclTestManifestTplVars{
// one of the nodes will be used for the master pod
WorkerNodeCount: nodeCount - 1,
WorkerNodeGpuCount: (nodeCount - 1) * gpuPerNode,
WorkerNodeCount: nodeCount,
WorkerNodeGpuCount: nodeCount * gpuPerNode,
GpuPerNode: gpuPerNode,
NvidiaTestImage: *nvidiaTestImage,
EfaInterfacePerNode: efaPerNode,
Expand Down
54 changes: 53 additions & 1 deletion e2e2/test/cases/nvidia/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ var (
//go:embed manifests/job-unit-test-single-node.yaml
jobUnitTestSingleNodeManifest []byte
renderedJobUnitTestSingleNodeManifest []byte
//go:embed manifests/job-hpc-benchmarks.yaml
jobHpcBenchmarksSingleNodeManifest []byte
renderedJobHpcBenchmarksSingleNodeManifest []byte
)

type unitTestManifestTplVars struct {
NvidiaTestImage string
}

type hpcTestManifestTplVars struct {
GpuPerNode int
}

func TestSingleNodeUnitTest(t *testing.T) {
unitTest := features.New("unit-test").
WithLabel("suite", "nvidia").
Expand Down Expand Up @@ -74,5 +81,50 @@ func TestSingleNodeUnitTest(t *testing.T) {
}).
Feature()

testenv.Test(t, unitTest)
hpcTest := features.New("hpc-benckmarks").
WithLabel("suite", "nvidia").
WithLabel("hardware", "gpu").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
var err error
renderedJobHpcBenchmarksSingleNodeManifest, err = fwext.RenderManifests(jobHpcBenchmarksSingleNodeManifest, hpcTestManifestTplVars{
GpuPerNode: gpuPerNode,
})
if err != nil {
t.Fatal(err)
}
err = fwext.ApplyManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest)
if err != nil {
t.Fatal(err)
}
return ctx
}).
Assess("HPC test Job succeeds", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"},
}
err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job),
wait.WithContext(ctx))
if err != nil {
t.Fatal(err)
}
return ctx
}).
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
log, err := fwext.GetJobLogs(cfg.Client().RESTConfig(), &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "hpc-benckmarks-job", Namespace: "default"},
})
if err != nil {
t.Fatal(err)
}
t.Log("Test log for hpc-benckmarks-job:")
t.Log(log)
err = fwext.DeleteManifests(cfg.Client().RESTConfig(), renderedJobHpcBenchmarksSingleNodeManifest)
if err != nil {
t.Fatal(err)
}
return ctx
}).
Feature()

testenv.Test(t, unitTest, hpcTest)
}
1 change: 1 addition & 0 deletions kubetest2/internal/deployers/eksapi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type deployerOptions struct {
Addons []string `flag:"addons" desc:"Managed addons (name:version pairs) to create in the cluster. Use 'latest' for the most recent version, or 'default' for the default version."`
AMI string `flag:"ami" desc:"AMI for unmanaged nodes"`
AMIType string `flag:"ami-type" desc:"AMI type for managed nodes"`
CapacityReservation bool `flag:"capacity-reservation" desc:"Use capacity reservation for the unmanaged nodegroup"`
ClusterRoleServicePrincipal string `flag:"cluster-role-service-principal" desc:"Additional service principal that can assume the cluster role"`
EFA bool `flag:"efa" desc:"Create EFA interfaces on the node of an unmanaged nodegroup. Requires --unmanaged-nodes."`
EKSEndpointURL string `flag:"endpoint-url" desc:"Endpoint URL for the EKS API"`
Expand Down
68 changes: 67 additions & 1 deletion kubetest2/internal/deployers/eksapi/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
if err != nil {
return err
}
var subnetId, capacityReservationId string
if opts.CapacityReservation {
subnetId, capacityReservationId, err = m.getSubnetWithCapacity(infra, opts)
if err != nil {
return err
}
} else {
subnetId = infra.subnetsPrivate[0]
}

// pull the role name out of the ARN
nodeRoleArnParts := strings.Split(infra.nodeRole, "/")
nodeRoleName := nodeRoleArnParts[len(nodeRoleArnParts)-1]
Expand All @@ -267,7 +277,7 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
},
{
ParameterKey: aws.String("SubnetIds"),
ParameterValue: aws.String(infra.subnetsPrivate[0]), // this is load bearing! EFA requires a private subnet
ParameterValue: aws.String(subnetId), // this is load bearing! EFA requires a private subnet
},
{
ParameterKey: aws.String("UserData"),
Expand Down Expand Up @@ -301,6 +311,10 @@ func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure
ParameterKey: aws.String("InstanceType"),
ParameterValue: aws.String(opts.InstanceTypes[0]),
},
{
ParameterKey: aws.String("CapacityReservationId"),
ParameterValue: aws.String(capacityReservationId),
},
},
}
out, err := m.clients.CFN().CreateStack(context.TODO(), &input)
Expand Down Expand Up @@ -432,3 +446,55 @@ func (m *NodegroupManager) verifyASGAMI(asgName string, amiId string) (bool, err
klog.Infof("ASG instances are using expected AMI: %s", amiId)
return true, nil
}

func (m *NodegroupManager) getSubnetWithCapacity(infra *Infrastructure, opts *deployerOptions) (string, string, error) {
var capacityReservationId string
capacityReservations, err := m.clients.EC2().DescribeCapacityReservations(context.TODO(), &ec2.DescribeCapacityReservationsInput{
Filters: []ec2types.Filter{
{
Name: aws.String("instance-type"),
Values: opts.InstanceTypes,
},
{
Name: aws.String("state"),
Values: []string{"active"},
},
},
})
if err != nil {
return "", "", fmt.Errorf("failed to describe capacity reservation")
}
var az string
for _, cr := range capacityReservations.CapacityReservations {
if *cr.AvailableInstanceCount >= int32(opts.Nodes) {
capacityReservationId = *cr.CapacityReservationId
az = *cr.AvailabilityZone
break
}
}
if capacityReservationId == "" {
return "", "", fmt.Errorf("no capacity reservation found for instance type %s with %d nodes count", opts.InstanceTypes[0], opts.Nodes)
}
klog.Infof("Using capacity reservation: %s", capacityReservationId)
subnet, err := m.clients.EC2().DescribeSubnets(context.TODO(), &ec2.DescribeSubnetsInput{
Filters: []ec2types.Filter{
{
Name: aws.String("availability-zone"),
Values: []string{az},
},
{
Name: aws.String("subnet-id"),
Values: infra.subnetsPrivate,
},
},
})
if err != nil {
return "", "", fmt.Errorf("failed to describe subnet")
}
if subnet == nil || len(subnet.Subnets) == 0 {
return "", "", fmt.Errorf("no subnet found for availability zone %s", az)
}
subnetId := *subnet.Subnets[0].SubnetId
klog.Infof("Using subnet: %s", subnetId)
return subnetId, capacityReservationId, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ Parameters:

UserData:
Type: String

CapacityReservationId:
Type: String
Description: Capacity reservation id for the unmanaged nodegroup

InstanceType:
Type: String
Expand All @@ -51,6 +55,8 @@ Conditions:
!Equals [Ref: InstanceType, p4d.24xlarge]
IsP5Node:
!Equals [Ref: InstanceType, p5.48xlarge]
IsCapacityReservationIdSet:
!Not [!Equals [!Ref CapacityReservationId, ""]]

Resources:
EFASecurityGroup:
Expand Down Expand Up @@ -201,6 +207,12 @@ Resources:
DeleteOnTermination: true
VolumeSize: !Ref NodeDiskSize
VolumeType: gp2
CapacityReservationSpecification:
!If
- IsCapacityReservationIdSet
- CapacityReservationTarget:
CapacityReservationId: !Ref CapacityReservationId
- !Ref AWS::NoValue
IamInstanceProfile:
Arn: !GetAtt NodeInstanceProfile.Arn
ImageId: !Ref AMIId
Expand Down
Loading