Skip to content

Commit

Permalink
[SPARK-24434][K8S] pod template files
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

New feature to pass podspec files for driver and executor pods.

## How was this patch tested?
new unit and integration tests

- [x] more overwrites in integration tests
- [ ] invalid template integration test, documentation

Author: Onur Satici <[email protected]>
Author: Yifei Huang <[email protected]>
Author: onursatici <[email protected]>

Closes #22146 from onursatici/pod-template.
  • Loading branch information
onursatici authored and mccheah committed Oct 30, 2018
1 parent c36537f commit f6cc354
Show file tree
Hide file tree
Showing 24 changed files with 991 additions and 52 deletions.
180 changes: 180 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,22 @@ To use a secret through an environment variable use the following options to the
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Pod Template
Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
file, the file will be automatically mounted onto a volume in the driver pod when it's created.
Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation.

It is important to note that Spark is opinionated about certain pod configurations so there are values in the
pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.

Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
the list will be the driver or executor container.

## Using Kubernetes Volumes

Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods:
Expand Down Expand Up @@ -863,4 +879,168 @@ specific to Spark on Kubernetes.
to provide any kerberos credentials for launching a job.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the driver [pod template](#pod-template). For example
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the executor [pod template](#pod-template). For example
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
</td>
</tr>
</table>

#### Pod template properties

See the below table for the full list of pod specifications that will be overwritten by spark.

### Pod Metadata

<table class="table">
<tr><th>Pod metadata key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>name</td>
<td>Value of <code>spark.kubernetes.driver.pod.name</code></td>
<td>
The driver pod name will be overwritten with either the configured or default value of
<code>spark.kubernetes.driver.pod.name</code>. The executor pod names will be unaffected.
</td>
</tr>
<tr>
<td>namespace</td>
<td>Value of <code>spark.kubernetes.namespace</code></td>
<td>
Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
be replaced by either the configured or default spark conf value.
</td>
</tr>
<tr>
<td>labels</td>
<td>Adds the labels from <code>spark.kubernetes.{driver,executor}.label.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
<tr>
<td>annotations</td>
<td>Adds the annotations from <code>spark.kubernetes.{driver,executor}.annotation.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
</table>

### Pod Spec

<table class="table">
<tr><th>Pod spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>imagePullSecrets</td>
<td>Adds image pull secrets from <code>spark.kubernetes.container.image.pullSecrets</code></td>
<td>
Additional pull secrets will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>nodeSelector</td>
<td>Adds node selectors from <code>spark.kubernetes.node.selector.*</code></td>
<td>
Additional node selectors will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>restartPolicy</td>
<td><code>"never"</code></td>
<td>
Spark assumes that both drivers and executors never restart.
</td>
</tr>
<tr>
<td>serviceAccount</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccount</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>serviceAccountName</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccountName</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>volumes</td>
<td>Adds volumes from <code>spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>

### Container spec

The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.

<table class="table">
<tr><th>Container spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>env</td>
<td>Adds env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>
Spark will add driver env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code>, and
executor env variables from <code>spark.executorEnv.[EnvironmentVariableName]</code>.
</td>
</tr>
<tr>
<td>image</td>
<td>Value of <code>spark.kubernetes.{driver,executor}.container.image</code></td>
<td>
The image will be defined by the spark configurations.
</td>
</tr>
<tr>
<td>imagePullPolicy</td>
<td>Value of <code>spark.kubernetes.container.image.pullPolicy</code></td>
<td>
Spark will override the pull policy for both driver and executors.
</td>
</tr>
<tr>
<td>name</td>
<td>See description.</code></td>
<td>
The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
"executor" for each executor container) if not defined by the pod template. If the container is defined by the
template, the template's name will be used.
</td>
</tr>
<tr>
<td>resources</td>
<td>See description</td>
<td>
The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
<code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
<code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.

</td>
</tr>
<tr>
<td>volumeMounts</td>
<td>Add volumes from <code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly}</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,30 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)

val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
.doc("File containing a template pod spec for the driver")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
.doc("File containing a template pod spec for executors")
.stringConf
.createOptional

val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME =
ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName")
.doc("container name to be used as a basis for the driver in the given pod template")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME =
ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName")
.doc("container name to be used as a basis for executors in the given pod template")
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ private[spark] object Constants {
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template"
val POD_TEMPLATE_VOLUME = "pod-template-volume"
val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
val POD_TEMPLATE_KEY = "podspec-configmap-key"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L

// Hadoop Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])

private[spark] object KubernetesDriverSpec {
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
*/
package org.apache.spark.deploy.k8s

import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] object KubernetesUtils {
private[spark] object KubernetesUtils extends Logging {

/**
* Extract and parse Spark configuration properties with a given name prefix and
Expand Down Expand Up @@ -82,6 +86,47 @@ private[spark] object KubernetesUtils {
}
}

def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: Option[String]): SparkPod = {
try {
val pod = kubernetesClient.pods().load(templateFile).get()
selectSparkContainer(pod, containerName)
} catch {
case e: Exception =>
logError(
s"Encountered exception while attempting to load initial pod spec from file", e)
throw new SparkException("Could not load pod from template file.", e)
}
}

def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
def selectNamedContainer(
containers: List[Container], name: String): Option[(Container, List[Container])] =
containers.partition(_.getName == name) match {
case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest))
case _ =>
logWarning(
s"specified container ${name} not found on pod template, " +
s"falling back to taking the first container")
Option.empty
}
val containers = pod.getSpec.getContainers.asScala.toList
containerName
.flatMap(selectNamedContainer(containers, _))
.orElse(containers.headOption.map((_, containers.tail)))
.map {
case (sparkContainer: Container, rest: List[Container]) => SparkPod(
new PodBuilder(pod)
.editSpec()
.withContainers(rest.asJava)
.endSpec()
.build(),
sparkContainer)
}.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)

def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
)
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
val driverContainer = new ContainerBuilder(pod.container)
.withName(DRIVER_CONTAINER_NAME)
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
.addNewPort()
Expand All @@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep(
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.editOrNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity)
Expand All @@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep(
.addToLabels(conf.roleLabels.asJava)
.addToAnnotations(conf.roleAnnotations.asJava)
.endMetadata()
.withNewSpec()
.editOrNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(conf.nodeSelector().asJava)
.addToNodeSelector(conf.nodeSelector().asJava)
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
.endSpec()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ private[spark] class BasicExecutorFeatureStep(
}

val executorContainer = new ContainerBuilder(pod.container)
.withName("executor")
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
.withImage(executorContainerImage)
.withImagePullPolicy(kubernetesConf.imagePullPolicy())
.withNewResources()
.editOrNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
Expand Down Expand Up @@ -173,14 +173,14 @@ private[spark] class BasicExecutorFeatureStep(
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToLabels(kubernetesConf.roleLabels.asJava)
.addToAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
Expand Down
Loading

0 comments on commit f6cc354

Please sign in to comment.