Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Reproduce missing the nodeSelector issue on the PyTorchJob #1422

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hack/e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ function kind_load {
}

function kueue_deploy {
kubectl apply -k github.com/kubeflow/training-operator/manifests/base/crds?ref=master
(cd config/components/manager && $KUSTOMIZE edit set image controller=$IMAGE_TAG)
kubectl apply --server-side -k test/e2e/config
kubectl wait -n kueue-system --for=condition=available deployment/kueue-controller-manager --timeout=120s
sleep 30
kubectl apply -f site/static/examples/jobs/sample-pytorchjob.yaml
}

trap cleanup EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ func (j *KubeflowJob) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error
for index := range podSetsInfo {
replicaType := orderedReplicaTypes[index]
info := podSetsInfo[index]
replica := &j.KFJobControl.ReplicaSpecs()[replicaType].Template
if err := podset.Merge(&replica.ObjectMeta, &replica.Spec, info); err != nil {
err := podset.Merge(&j.KFJobControl.ReplicaSpecs()[replicaType].Template.ObjectMeta, &j.KFJobControl.ReplicaSpecs()[replicaType].Template.Spec, info)
if err != nil {
return err
}

}
return nil
}
Expand All @@ -76,8 +75,7 @@ func (j *KubeflowJob) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool {
changed := false
for index, info := range podSetsInfo {
replicaType := orderedReplicaTypes[index]
replica := &j.KFJobControl.ReplicaSpecs()[replicaType].Template
changed = podset.RestorePodSpec(&replica.ObjectMeta, &replica.Spec, info) || changed
changed = podset.RestorePodSpec(&j.KFJobControl.ReplicaSpecs()[replicaType].Template.ObjectMeta, &j.KFJobControl.ReplicaSpecs()[replicaType].Template.Spec, info) || changed
}
return changed
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper {
}}
}

func (j *PyTorchJobWrapper) Image(img string, args []string) *PyTorchJobWrapper {
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Args = args
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image = img
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Args = args
return j
}

// PriorityClass updates job priorityclass.
func (j *PyTorchJobWrapper) PriorityClass(pc string) *PyTorchJobWrapper {
if j.Spec.RunPolicy.SchedulingPolicy == nil {
Expand Down
2 changes: 1 addition & 1 deletion site/static/examples/jobs/sample-pytorchjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: pytorch-simple
namespace: default
labels:
kueue.x-k8s.io/queue-name: user-queue
kueue.x-k8s.io/queue-name: main
spec:
pytorchReplicaSpecs:
Master:
Expand Down
8 changes: 7 additions & 1 deletion test/e2e/config/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ resources:
- ../../../config/default

patches:
- path: manager_e2e_patch.yaml
- path: manager_e2e_patch.yaml
target:
group: apps
version: v1
kind: Deployment
name: kueue-controller-manager
namespace: kueue-system
19 changes: 6 additions & 13 deletions test/e2e/config/manager_e2e_patch.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
spec:
template:
spec:
containers:
- name: manager
imagePullPolicy: IfNotPresent
args:
- "--feature-gates=VisibilityOnDemand=true"
- op: replace
path: /spec/template/spec/containers/0/imagePullPolicy
value: IfNotPresent
- op: add
path: /spec/template/spec/containers/0/args/-
value: --feature-gates=VisibilityOnDemand=true
40 changes: 39 additions & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -31,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
Expand Down Expand Up @@ -686,6 +686,32 @@ var _ = ginkgo.Describe("Kueue", func() {
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.FIt("Should unsuspend a PyTorchJob and set nodeSelectors", func() {
// Use a binary that ends.
//sampleJob := testingpytorchjob.MakePyTorchJob("test-job", ns.Name).
// Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"2s"}).
// Queue(localQueue.Name).
// Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "0.3").
// Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "0.3").
// Obj()
//gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

//time.Sleep(time.Minute * 100)
createdWorkload := &kueue.Workload{}
jobKey = types.NamespacedName{Name: "pytorch-simple", Namespace: "default"}
expectPyTorchJobUnsuspendedWithNodeSelectors(jobKey, map[string]string{
"instance-type": "on-demand",
})
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobKey.Name), Namespace: ns.Name}
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return workload.HasQuotaReservation(createdWorkload) &&
apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished)
}, util.LongTimeout, util.Interval).Should(gomega.BeTrue())
})

ginkgo.It("Should readmit preempted job with priorityClass into a separate flavor", func() {
gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed())

Expand Down Expand Up @@ -974,6 +1000,18 @@ func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[stri
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns}))
}

func expectPyTorchJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {
job := &kftraining.PyTorchJob{}
gomega.EventuallyWithOffset(1, func() []any {
gomega.Expect(k8sClient.Get(ctx, key, job)).To(gomega.Succeed())
return []any{
*job.Spec.RunPolicy.Suspend,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.NodeSelector,
job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.NodeSelector,
}
}, util.Timeout, util.Interval).Should(gomega.Equal([]any{false, ns, ns}))
}

func defaultOwnerReferenceForJob(name string) []metav1.OwnerReference {
return []metav1.OwnerReference{
{
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"context"
"fmt"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"os"
"testing"
"time"
Expand Down Expand Up @@ -74,6 +75,9 @@ func CreateClientUsingCluster() client.Client {
err = kueue.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kftraining.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = visibility.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

Expand Down