Skip to content

Commit

Permalink
Reproduce missing the nodeSelector issue on the PyTorchJob
Browse files Browse the repository at this point in the history
Signed-off-by: tenzen-y <[email protected]>
  • Loading branch information
tenzen-y committed Dec 7, 2023
1 parent 006d825 commit d9cb9a7
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ test-integration: gomod-download envtest ginkgo mpi-operator-crd ray-operator-cr

CREATE_KIND_CLUSTER ?= true
.PHONY: test-e2e
test-e2e: kustomize ginkgo run-test-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%)
test-e2e: kustomize ginkgo kf-training-operator-crd run-test-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%)

E2E_TARGETS := $(addprefix run-test-e2e-,${E2E_K8S_VERSIONS})
.PHONY: test-e2e-all
Expand Down
1 change: 1 addition & 0 deletions hack/e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ function startup {
kubectl get nodes > $ARTIFACTS/kind-nodes.log || true
kubectl describe pods -n kube-system > $ARTIFACTS/kube-system-pods.log || true
fi
kubectl apply -k dep-crds/training-operator/
}

function kind_load {
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper {
}}
}

func (j *PyTorchJobWrapper) Image(img string, args []string) *PyTorchJobWrapper {
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Args = args
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Args = args
return j
}

// PriorityClass updates job priorityclass.
func (j *PyTorchJobWrapper) PriorityClass(pc string) *PyTorchJobWrapper {
if j.Spec.RunPolicy.SchedulingPolicy == nil {
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1"
Expand Down Expand Up @@ -593,6 +595,26 @@ var _ = ginkgo.Describe("Kueue", func() {
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.FIt("Should unsuspend a PyTorchJob and set nodeSelectors", func() {
// Use a binary that ends.
sampleJob := testingpytorchjob.MakePyTorchJob("test-job", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"2s"}).Obj()
gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

createdWorkload := &kueue.Workload{}
expectPyTorchJobUnsuspendedWithNodeSelectors(jobKey, map[string]string{
"instance-type": "on-demand",
})
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobKey.Name), Namespace: ns.Name}
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return workload.HasQuotaReservation(createdWorkload) &&
apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished)
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.It("Should readmit preempted job with priorityClass into a separate flavor", func() {
gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

Expand Down Expand Up @@ -881,6 +903,18 @@ func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[stri
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns}))
}

func expectPyTorchJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {
job := &kftraining.PyTorchJob{}
gomega.EventuallyWithOffset(1, func() []any {
gomega.Expect(k8sClient.Get(ctx, key, job)).To(gomega.Succeed())
return []any{
*job.Spec.RunPolicy.Suspend,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.NodeSelector,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.NodeSelector,
}
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns, ns}))
}

func defaultOwnerReferenceForJob(name string) []metav1.OwnerReference {
return []metav1.OwnerReference{
{
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"context"
"fmt"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"os"
"testing"
"time"
Expand Down Expand Up @@ -73,6 +74,9 @@ func CreateClientUsingCluster() client.Client {
err = kueue.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kftraining.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = visibility.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

Expand Down

0 comments on commit d9cb9a7

Please sign in to comment.