Skip to content

Commit

Permalink
Pass worker_spec_command to mpi plugin to support horovod (flyteorg…
Browse files Browse the repository at this point in the history
…#341)

* wip

Signed-off-by: byhsu <[email protected]>

* add comment

Signed-off-by: byhsu <[email protected]>

* more comment

Signed-off-by: byhsu <[email protected]>

* fix style

Signed-off-by: byhsu <[email protected]>

---------

Signed-off-by: byhsu <[email protected]>
Co-authored-by: byhsu <[email protected]>
  • Loading branch information
ByronHsu and byhsu authored Apr 19, 2023
1 parent f5f4182 commit 46796b6
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mpi
import (
"context"
"fmt"
"strings"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
Expand All @@ -21,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const workerSpecCommandKey = "worker_spec_command"

type mpiOperatorResourceHandler struct {
}

Expand All @@ -45,6 +48,7 @@ func (mpiOperatorResourceHandler) BuildIdentityResource(ctx context.Context, tas
// Defines a func to create the full resource object that will be posted to k8s.
func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
taskTemplateConfig := taskTemplate.GetConfig()

if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
Expand All @@ -69,11 +73,19 @@ func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx plu
common.OverridePrimaryContainerName(podSpec, primaryContainerName, kubeflowv1.MPIJobDefaultContainerName)

// workersPodSpec is deepCopy of podSpec submitted by flyte
// WorkerPodSpec doesn't need any Argument & command. It will be trigger from launcher pod
workersPodSpec := podSpec.DeepCopy()

// If users don't specify "worker_spec_command" in the task config, the command/args are empty.
// However, in some cases, the workers need command/args.
// For example, in horovod tasks, each worker runs a command launching ssh daemon.

workerSpecCommand := []string{}
if val, ok := taskTemplateConfig[workerSpecCommandKey]; ok {
workerSpecCommand = strings.Split(val, " ")
}

for k := range workersPodSpec.Containers {
workersPodSpec.Containers[k].Args = []string{}
workersPodSpec.Containers[k].Args = workerSpecCommand
workersPodSpec.Containers[k].Command = []string{}
}

Expand Down

0 comments on commit 46796b6

Please sign in to comment.