Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run workers first and wait for them #484

Merged
merged 21 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ spec:
type: object
spec:
properties:
launcherCreationPolicy:
description: launcherCreationPolicy if WaitForWorkersReady, the launcher
is created only after all workers are in Ready state. Defaults to
AtStartup.
type: string
mpiImplementation:
default: OpenMPI
description: MPIImplementation is the MPI implementation. Options
Expand Down
5 changes: 5 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ spec:
type: object
spec:
properties:
launcherCreationPolicy:
description: launcherCreationPolicy if WaitForWorkersReady, the launcher
is created only after all workers are in Ready state. Defaults to
AtStartup.
type: string
mpiImplementation:
default: OpenMPI
description: MPIImplementation is the MPI implementation. Options
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/kubeflow/v2beta1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) {
if mpiJob.Spec.MPIImplementation == "" {
mpiJob.Spec.MPIImplementation = MPIImplementationOpenMPI
}
if mpiJob.Spec.LauncherCreationPolicy == "" {
mpiJob.Spec.LauncherCreationPolicy = LauncherCreationPolicyAtStartup
}

// set default to Launcher
setDefaultsTypeLauncher(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeLauncher])
Expand Down
35 changes: 21 additions & 14 deletions pkg/apis/kubeflow/v2beta1/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -48,8 +49,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
LauncherCreationPolicy: "AtStartup",
},
},
want: MPIJob{
Expand All @@ -61,8 +63,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -76,8 +79,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
LauncherCreationPolicy: "AtStartup",
},
},
want: MPIJob{
Expand All @@ -89,8 +93,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
LauncherCreationPolicy: "AtStartup",
},
},
},
Expand All @@ -108,8 +113,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeLauncher: {
Replicas: newInt32(1),
Expand All @@ -133,8 +139,9 @@ func TestSetDefaults_MPIJob(t *testing.T) {
RunPolicy: RunPolicy{
CleanPodPolicy: NewCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
LauncherCreationPolicy: "AtStartup",
MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{
MPIReplicaTypeWorker: {
Replicas: newInt32(0),
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow/v2beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@
"mpiReplicaSpecs"
],
"properties": {
"launcherCreationPolicy": {
"description": "launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.",
"type": "string"
},
"mpiImplementation": {
"description": "MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\".",
"type": "string"
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ type RunPolicy struct {
Suspend *bool `json:"suspend,omitempty"`
}

type LauncherCreationPolicy string

const (
LauncherCreationPolicyAtStartup = "AtStartup"
LauncherCreationPolicyWaitForWorkersReady = "WaitForWorkersReady"
xhejtman marked this conversation as resolved.
Show resolved Hide resolved
)

type MPIJobSpec struct {

// Specifies the number of slots per worker used in hostfile.
Expand All @@ -154,6 +161,9 @@ type MPIJobSpec struct {
// +kubebuilder:default:="/root/.ssh"
SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"`

// launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add:

// +kubebuilder:validation:Enum:AtStartup;WaitForWorkersReady
// +kubebuilder:default:=AtStartup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just below line 170?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

LauncherCreationPolicy LauncherCreationPolicy `json:"launcherCreationPolicy,omitempty"`

// MPIImplementation is the MPI implementation.
// Options are "OpenMPI" (default), "Intel" and "MPICH".
// +kubebuilder:validation:Enum:=OpenMPI;Intel;MPICH
Expand Down
40 changes: 29 additions & 11 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,14 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}
if launcher == nil {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
}
} else {
klog.V(4).Infof("Waiting for workers %s/%s to start.", mpiJob.Namespace, mpiJob.Name)
}
}
}
Expand Down Expand Up @@ -776,6 +780,19 @@ func (c *MPIJobController) getRunningWorkerPods(mpiJob *kubeflow.MPIJob) ([]*cor
return podList, nil
}

func (c *MPIJobController) countReadyWorkerPods(workers []*corev1.Pod) int {
ready := 0
for _, pod := range workers {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
ready++
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ready++
ready++
break

We don't need to check the rest of the conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this one

break
}
}
}
return ready
}

// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
// one if it doesn't exist.
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev1.ConfigMap, error) {
Expand Down Expand Up @@ -1011,14 +1028,15 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
mpiJob.Status.StartTime = &now
}
}
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt := countRunningPods(launcherPods)
launcherPodsCnt := 0
if launcher != nil {
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt = countRunningPods(launcherPods)
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeLauncher)
launcherStatus := mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeLauncher]
launcherStatus.Failed = launcher.Status.Failed
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func () {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"

mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Expand Down Expand Up @@ -283,7 +283,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
})

ginkgo.When("running as non-root", func() {
ginkgo.BeforeEach(func () {
ginkgo.BeforeEach(func() {
mpiJob.Spec.SSHAuthMountPath = "/home/mpiuser/.ssh"

mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Expand Down
Loading