Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Pass K8sPluginConfig to spark driver and executor pods #patch #271

Merged
merged 13 commits into from
Nov 1, 2022

Conversation

fg91
Copy link
Member

@fg91 fg91 commented Jun 14, 2022

TL;DR

Currently, when running Spark tasks, some of the k8s plugin config values configured in the helm values (such as for instance DefaultTolerations, NodeSelector, HostNetwork, SchedulerName, ...) are not carried over to the SparkApplication and, thus, not to the driver and executor pods.

In my specific case this is limiting because we run Flyte itself and the Spark operator on cheap nodes while giving workflows the ability to start high-powered nodes via default tolerations.

This PR fixes this issue.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Comparing the custom SparkPodSpec with Flyte’s K8sPluginConfig shows that the following configurations are already carried over to the SparkPodSpec of the driver and the executor:

This PR adds logic to carry over the following configurations:

  • DefaultTolerations
  • SchedulerName
  • DefaultNodeSelector
  • EnableHostNetworkingPod
  • DefaultEnvVarsFromEnv
  • DefaultAffinity

Follow-up issue

The Spark operator passes the tolerations from the SparkApplication along to the pods only if the operator itself is installed with the --set webhook.enable=true value to activate Mutating Admission Webhooks. I feel I should document this somewhere. Should I make a PR to note this here or would you recommend another place?

@welcome
Copy link

welcome bot commented Jun 14, 2022

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@fg91 fg91 force-pushed the fg91-spark-tolerations branch from 6b0c825 to 87988c9 Compare June 14, 2022 18:39
@kumare3
Copy link
Contributor

kumare3 commented Jun 15, 2022

cc @hamersaw

@codecov
Copy link

codecov bot commented Jun 15, 2022

Codecov Report

Merging #271 (7884de7) into master (902b902) will increase coverage by 0.39%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master     #271      +/-   ##
==========================================
+ Coverage   62.97%   63.37%   +0.39%     
==========================================
  Files         142      145       +3     
  Lines        8970     9324     +354     
==========================================
+ Hits         5649     5909     +260     
- Misses       2799     2872      +73     
- Partials      522      543      +21     
Flag Coverage Δ
unittests 62.79% <100.00%> (+0.46%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
go/tasks/pluginmachinery/flytek8s/pod_helper.go 79.13% <100.00%> (-2.16%) ⬇️
go/tasks/plugins/k8s/spark/spark.go 79.29% <100.00%> (+0.83%) ⬆️
go/tasks/plugins/array/awsbatch/executor.go 38.05% <0.00%> (-4.93%) ⬇️
go/tasks/plugins/k8s/pod/container.go 66.66% <0.00%> (-4.77%) ⬇️
go/tasks/plugins/k8s/pod/sidecar.go 78.02% <0.00%> (-2.20%) ⬇️
go/tasks/plugins/array/k8s/subtask.go 30.35% <0.00%> (-0.56%) ⬇️
...o/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go 71.62% <0.00%> (-0.38%) ⬇️
...s/plugins/k8s/kfoperators/tensorflow/tensorflow.go 75.86% <0.00%> (-0.28%) ⬇️
go/tasks/pluginmachinery/encoding/encoder.go 100.00% <0.00%> (ø)
... and 11 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@fg91 fg91 marked this pull request as draft June 19, 2022 18:55
@fg91 fg91 changed the title Pass default tolerations to spark driver and executor pods #patch Pass K8sPluginConfig to spark driver and executor pods #patch Jun 19, 2022
@fg91 fg91 force-pushed the fg91-spark-tolerations branch from 22b3438 to 59905ec Compare June 19, 2022 20:23
@hamersaw
Copy link
Contributor

Filed an issue for this so we can better track. Hopefully will improve visibility.

@hamersaw
Copy link
Contributor

@fg91 I think we discussed scoping this PR down to just applying the existing configuration? We can push upgrading the k8s-on-spark-operator once we have out-of-core plugins implemented (on the roadmap for 1.3 release). Do you have any bandwidth to make these changes? Otherwise we may be able to contribute as well.

@fg91 fg91 force-pushed the fg91-spark-tolerations branch from 4816c17 to 46244c2 Compare October 12, 2022 18:05
Fabio Grätz added 4 commits October 12, 2022 20:56
@fg91 fg91 force-pushed the fg91-spark-tolerations branch from ec1146f to e5873ed Compare October 12, 2022 18:56
@fg91
Copy link
Member Author

fg91 commented Oct 12, 2022

@hamersaw I removed the commits in which I tried to upgrade K8s and Spark-on-K8s. Then I compared the custom SparkPodSpec from the currently pinned version with Flyte’s K8sPluginConfig again and additionally carried over EnableHostNetworkingPod, DefaultEnvVarsFromEnv, and DefaultAffinity. (I adapted the PR description.)

I'm interested in your opinion how you would treat:

  • DefaultCPURequest
  • DefaultMemoryRequest
  • GpuResourceName
  • ResourceTolerations

In principle one could map them to SparkPodSpec.Cores, SparkPodSpec.Memory, ... however I'm not convinced of this because of the following reason:

In a Spark task in flyte one typically configures the resources for the ephemeral spark cluster this way (source):

@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.driver.cores": "1",
        }
    ),
)
def hello_spark(partitions: int) -> float:

Considering DefaultCPURequest in addition to spark_conf feels like doing the following which according to my understanding doesn't make sense:

@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.driver.cores": "1",
        }
    ),
    requests=Resources(                  # <-- new
        mem="1G",
    ),
)
def hello_spark(partitions: int) -> float:

If you agree, I'd propose to not carry the above mentioned values over and mark this PR ready for review.

@fg91 fg91 force-pushed the fg91-spark-tolerations branch from 3afefec to ebdd8ac Compare October 12, 2022 19:33
@fg91
Copy link
Member Author

fg91 commented Oct 12, 2022

@hamersaw can you please validate that the behaviour I documented in this commit is the desired one?

@fg91 fg91 marked this pull request as ready for review October 12, 2022 19:50
@fg91 fg91 requested a review from hamersaw October 12, 2022 19:50
@hamersaw
Copy link
Contributor

hamersaw commented Oct 13, 2022

If you agree, I'd propose to not carry the above mentioned values over and mark this PR ready for review.

I totally agree. The ResourceTolerations is interesting, presumably we could use resources set in the spark_conf (ex. spark.driver.memory), but this seems very error prone. This disjoint between Flyte task resource requests and Spark resources (ie. heterogeneously set between driver and executors) makes this a difficult problem. IMO the proposed implementation is the least error prone.

@hamersaw
Copy link
Contributor

hamersaw commented Oct 13, 2022

@hamersaw can you please validate that the behaviour I documented in this commit is the desired one?

It looks like in other k8s resources setting the task to interruptible appends the interruptible tolerations and node selectors rather than overriding them. I think it makes sense to mimic this functionality here, thoughts?

@fg91 fg91 force-pushed the fg91-spark-tolerations branch from a1678d3 to 63fd182 Compare October 15, 2022 15:34
Signed-off-by: Fabio Grätz <[email protected]>
@fg91 fg91 force-pushed the fg91-spark-tolerations branch from fea047a to d843ed8 Compare October 15, 2022 15:53
@fg91
Copy link
Member Author

fg91 commented Oct 15, 2022

It looks like in other k8s resources setting the task to interruptible appends the interruptible tolerations and node selectors rather than overriding them. I think it makes sense to mimic this functionality here, thoughts?

  1. tolerations: I agree and this is also currently done here.

  2. nodeSelector: The link you posted doesn't tackle node selectors and I think appending wouldn't work:

    The docs say:

    Kubernetes only schedules the Pod onto nodes that have each of the labels you specify.

    When trying to schedule this pod

    apiVersion: v1
    kind: Pod
    metadata:
      name: nginx
      labels:
        env: test
    spec:
      containers:
      - name: nginx
        image: nginx
        imagePullPolicy: IfNotPresent
      nodeSelector:
        foo: bar
        foofoo: barbar

    on a node that has only been labeled kubectl label nodes k3d-sandbox-server-0 foo=bar, it remains pending:

    Warning  FailedScheduling  10s   default-scheduler  0/1 nodes are available: 1 node(s) didn't match Pod's node affinity/selector.

    I think in order to append, one would have to use affinities:

    The affinity/anti-affinity language is more expressive. nodeSelector only selects nodes with all the specified labels. Affinity/anti-affinity gives you more control over the selection logic.

    Here in K8sPluginConfig it says that InterruptibleNodeSelector is deprecated anyways and that one should use InterruptibleNodeSelectorRequirement instead.

    Neither InterruptibleNodeSelectorRequirement nor NonInterruptibleNodeSelectorRequirement are currently handled for Spark tasks. I added functionality to carry over the default affinity and I'm now considering whether the correct behaviour would be to add the InterruptibleNodeSelectorRequirement or NonInterruptibleNodeSelectorRequirement to the nodeSelectorRequirements in the default affinity. This would be basically the same as done here with the only disadvantage that SparkApplication doesn't use v1.PodSpec but a custom SparkPodSpec (see here).

    We could probably do this without duplicating the code by putting most of the logic into a function that takes (interruptible bool, podSpec *v1.Affinity, nodeSelectorRequirement *v1.NodeSelectorRequirement) as arguments and then using this function in the exiting function that adds the node selector requirement to a PodSpec as well as in a new function that adds it to a SparkPodSpec.

    Did I miss something in my interpretation of nodeSelector and do you agree that this should be handling of the NodeSelectorRequirements?

@hamersaw
Copy link
Contributor

Did I miss something in my interpretation of nodeSelector and do you agree that this should be handling of the NodeSelectorRequirements?

Thanks for correcting me on the current NodeSelector application!

I absolutely agree. Let's go ahead with the updates you proposed. I think this is the last issue right?

@fg91 fg91 force-pushed the fg91-spark-tolerations branch from 7099d14 to 7884de7 Compare October 26, 2022 21:22
@fg91 fg91 requested a review from hamersaw October 26, 2022 21:27
@hamersaw hamersaw merged commit a87ca40 into flyteorg:master Nov 1, 2022
@welcome
Copy link

welcome bot commented Nov 1, 2022

Congrats on merging your first pull request! 🎉

@fg91 fg91 deleted the fg91-spark-tolerations branch November 1, 2022 16:38
eapolinario pushed a commit that referenced this pull request Sep 6, 2023
* Pass default tolerations to spark driver and executor

Signed-off-by: fg91 <[email protected]>

* Test passing default tolerations to spark driver and executor

Signed-off-by: fg91 <[email protected]>

* Pass scheduler name to driver and executor SparkPodSpec

Signed-off-by: fg91 <[email protected]>

* Carry DefaultNodeSelector from k8s plugin config to SparkPodSpec

Signed-off-by: fg91 <[email protected]>

* Carry over EnableHostNetworkingPod

Signed-off-by: Fabio Grätz <[email protected]>

* Test carrying over of default env vars

Signed-off-by: Fabio Grätz <[email protected]>

* Carry over DefaultEnvVarsFromEnv

Signed-off-by: Fabio Grätz <[email protected]>

* Carry over DefaultAffinity

Signed-off-by: Fabio Grätz <[email protected]>

* Doc behaviour of default and interruptible NodeSelector and Tolerations

Signed-off-by: Fabio Grätz <[email protected]>

* Don't carry over default env vars from env and fix test

Signed-off-by: Fabio Grätz <[email protected]>

* Lint

Signed-off-by: Fabio Grätz <[email protected]>

* Apply node selector requirement to pod affinity

Signed-off-by: Fabio Grätz <[email protected]>

Signed-off-by: fg91 <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants