Skip to content

Commit

Permalink
Rebase waitforworkers option
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Hejtmanek <[email protected]>
  • Loading branch information
xhejtman committed Apr 4, 2023
1 parent c945435 commit f975ef5
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 14 deletions.
2 changes: 2 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ spec:
type: integer
sshAuthMountPath:
type: string
waitForWorkers:
type: boolean
type: object
status:
properties:
Expand Down
2 changes: 2 additions & 0 deletions manifests/base/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ spec:
description: "Specifies the number of retries before marking the launcher Job as failed. Defaults to 6."
sshAuthMountPath:
type: string
waitForWorkers:
type: boolean
mpiImplementation:
type: string
enum: ["OpenMPI", "Intel"]
Expand Down
34 changes: 31 additions & 3 deletions sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions v2/crd/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6820,6 +6820,10 @@ spec:
description: SSHAuthMountPath is the directory where SSH keys are
mounted.
type: string
waitForWorkers:
default: false
description: Spawn launcher only after all workers are in Ready state.
type: boolean
required:
- mpiReplicaSpecs
type: object
Expand Down
6 changes: 6 additions & 0 deletions v2/pkg/apis/kubeflow/v2beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions v2/pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@
"sshAuthMountPath": {
"description": "SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\".",
"type": "string"
},
"waitForWorkers": {
"description": "Spawn launcher pod only after all worker pods are in Ready state. Defaults to false.",
"type": "boolean"
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions v2/pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type MPIJobSpec struct {
// +kubebuilder:default:="/root/.ssh"
SSHAuthMountPath string `json:"sshAuthMountPath,omitempty"`

// WaitForWorkers if true, the launcher is created only after all workers are in Ready state
WaitForWorkers bool `json:"waitForWorkers,omitempty"`

// MPIImplementation is the MPI implementation.
// Options are "OpenMPI" (default) and "Intel".
// +kubebuilder:validation:Enum:=OpenMPI;Intel
Expand Down
39 changes: 28 additions & 11 deletions v2/pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,14 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}
if launcher == nil {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
if !mpiJob.Spec.WaitForWorkers || c.countReadyWorkerPods(worker) == len(worker) {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
return fmt.Errorf("creating launcher Pod: %w", err)
}
} else {
klog.V(4).Infof("Waiting for workers %s/%s to start.", mpiJob.Namespace, mpiJob.Name)
}
}
}
Expand Down Expand Up @@ -687,6 +691,18 @@ func (c *MPIJobController) getRunningWorkerPods(mpiJob *kubeflow.MPIJob) ([]*cor
return podList, nil
}

func (c *MPIJobController) countReadyWorkerPods(workers []*corev1.Pod) (int) {
ready := 0
for _, pod := range workers {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
ready++
}
}
}
return ready
}

// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
// one if it doesn't exist.
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev1.ConfigMap, error) {
Expand Down Expand Up @@ -901,14 +917,15 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {

func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error {
oldStatus := mpiJob.Status.DeepCopy()
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt := countRunningPods(launcherPods)
launcherPodsCnt := 0
if launcher != nil {
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt := countRunningPods(launcherPods)
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeLauncher)
launcherStatus := mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeLauncher)]
launcherStatus.Failed = launcher.Status.Failed
Expand Down

0 comments on commit f975ef5

Please sign in to comment.