Skip to content

Commit

Permalink
[ZEPPELIN-5760] fix passing configs to spark in k8s (#4398)
Browse files Browse the repository at this point in the history
* passing static arguments to spark-submit command so driver can pick them up

* fixed static names

* removed duplicate driver memory setting

* fixed driver extra java opts

* extend test

* use ZEPPELIN_SPARK_CONF env var to pass spark configurations

* fix import wildmark

* fix separator

* remove redundant concatenation

* - remove redundant concatenation
- fix tests
  • Loading branch information
zlosim authored Sep 29, 2022
1 parent 46ae915 commit 73f9650
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
Expand Down Expand Up @@ -78,6 +78,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
private static final String SPARK_CONTAINER_IMAGE = "zeppelin.k8s.spark.container.image";
private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";
private static final String ENV_ZEPPELIN_HOME = "ZEPPELIN_HOME";
private static final String SPARK_DRIVER_DEFAULTJAVAOPTS = "spark.driver.defaultJavaOptions";
private static final String SPARK_DRIVER_EXTRAJAVAOPTS = "spark.driver.extraJavaOptions";

public K8sRemoteInterpreterProcess(
KubernetesClient client,
Expand Down Expand Up @@ -319,9 +321,24 @@ Properties getTemplateBindings(String userName) {
if (isSpark()) {
int webUiPort = 4040;
k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);

// There is already initial value following --driver-java-options added in interpreter.sh
// so we need to pass spark.driver.defaultJavaOptions and spark.driver.extraJavaOptions
// as SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF env variable to build spark-submit command correctly.
StringJoiner driverExtraJavaOpts = new StringJoiner(" ");
if (properties.containsKey(SPARK_DRIVER_DEFAULTJAVAOPTS)) {
driverExtraJavaOpts.add((String) properties.remove(SPARK_DRIVER_DEFAULTJAVAOPTS));
}
if (properties.containsKey(SPARK_DRIVER_EXTRAJAVAOPTS)) {
driverExtraJavaOpts.add((String) properties.remove(SPARK_DRIVER_EXTRAJAVAOPTS));
}
if (driverExtraJavaOpts.length() > 0) {
k8sEnv.put("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF", driverExtraJavaOpts.toString());
}

if (isSparkOnKubernetes(properties)) {
k8sEnv.put("SPARK_SUBMIT_OPTIONS",
getEnv().getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
addSparkK8sProperties();
k8sEnv.put("ZEPPELIN_SPARK_CONF", prepareZeppelinSparkConf(userName));
}
k8sEnv.put("SPARK_HOME", getEnv().getOrDefault("SPARK_HOME", "/spark"));

Expand Down Expand Up @@ -403,28 +420,35 @@ boolean isSparkOnKubernetes(Properties interpreterProperties) {
}

@VisibleForTesting
String buildSparkSubmitOptions(String userName) {
StringBuilder options = new StringBuilder();

options.append(" --master k8s://https://kubernetes.default.svc");
options.append(" --deploy-mode client");
if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
options.append(" --driver-memory ").append(properties.get(SPARK_DRIVER_MEMORY));
}
String prepareZeppelinSparkConf(String userName) {
StringJoiner sparkConfSJ = new StringJoiner("|");
if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) {
options.append(" --proxy-user ").append(userName);
sparkConfSJ.add("--proxy-user");
sparkConfSJ.add(userName);
}
options.append(" --conf spark.kubernetes.namespace=").append(getInterpreterNamespace());
options.append(" --conf spark.executor.instances=1");
options.append(" --conf spark.kubernetes.driver.pod.name=").append(getPodName());
String sparkContainerImage = properties.containsKey(SPARK_CONTAINER_IMAGE) ? properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage;
options.append(" --conf spark.kubernetes.container.image=").append(sparkContainerImage);
options.append(" --conf spark.driver.bindAddress=0.0.0.0");
options.append(" --conf spark.driver.host=").append(getInterpreterPodDnsName());
options.append(" --conf spark.driver.port=").append(getSparkDriverPort());
options.append(" --conf spark.blockManager.port=").append(getSparkBlockManagerPort());

return options.toString();

for (String key : properties.stringPropertyNames()) {
String propValue = properties.getProperty(key);
if (isSparkConf(key, propValue)) {
sparkConfSJ.add("--conf");
sparkConfSJ.add(key + "=" + propValue);
}
}

return sparkConfSJ.toString();
}

private void addSparkK8sProperties() {
properties.setProperty("spark.master", "k8s://https://kubernetes.default.svc");
properties.setProperty("spark.submit.deployMode", "client");
properties.setProperty("spark.kubernetes.namespace", getInterpreterNamespace());
properties.setProperty("spark.kubernetes.driver.pod.name", getPodName());
properties.setProperty("spark.kubernetes.container.image", properties.containsKey(SPARK_CONTAINER_IMAGE) ?
properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage);
properties.setProperty("spark.driver.bindAddress", "0.0.0.0");
properties.setProperty("spark.driver.host", getInterpreterPodDnsName());
properties.setProperty("spark.driver.port", String.valueOf(getSparkDriverPort()));
properties.setProperty("spark.blockManager.port", String.valueOf(getSparkBlockManagerPort()));
}

private String getInterpreterPodDnsName() {
Expand All @@ -433,6 +457,10 @@ private String getInterpreterPodDnsName() {
getInterpreterNamespace());
}

private boolean isSparkConf(String key, String value) {
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
}

/**
* See xxx-interpreter-pod.yaml
* @return SparkDriverPort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public void testGetTemplateBindingsForSpark() {
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
properties.put("spark.jars.ivy", "my_ivy_path");
properties.put("spark.driver.extraJavaOptions", "-Dextra_option");
Map<String, String> envs = new HashMap<>();
envs.put("MY_ENV1", "V1");
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
Expand Down Expand Up @@ -171,16 +173,21 @@ public void testGetTemplateBindingsForSpark() {

envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
assertTrue( envs.containsKey("SPARK_HOME"));
assertTrue( envs.containsKey("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF"));
String driverExtraOptions = envs.get("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF");
assertTrue(driverExtraOptions.contains("-Dextra_option"));

String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
assertTrue(sparkSubmitOptions.startsWith("my options "));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertFalse(sparkSubmitOptions.contains("--proxy-user"));
assertTrue(sparkSubmitOptions.startsWith("my options"));
String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF");
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default"));
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertTrue(zeppelinSparkConf.contains("spark.jars.ivy=my_ivy_path"));
assertFalse(zeppelinSparkConf.contains("--proxy-user"));
assertTrue(intp.isSpark());
}

Expand Down Expand Up @@ -225,14 +232,15 @@ public void testGetTemplateBindingsForSparkWithProxyUser() {
assertTrue( envs.containsKey("SPARK_HOME"));

String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
assertTrue(sparkSubmitOptions.startsWith("my options "));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
assertTrue(sparkSubmitOptions.startsWith("my options"));
String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF");
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default"));
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertTrue(zeppelinSparkConf.contains("--proxy-user|mytestUser"));
assertTrue(intp.isSpark());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testK8sLauncherWithSparkAndUserImpersonate() throws IOException {
assertTrue(client instanceof K8sRemoteInterpreterProcess);
K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
assertTrue(process.isSpark());
assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
assertTrue(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user|user1"));
}

@Test
Expand Down Expand Up @@ -131,6 +131,6 @@ public void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOExcepti
assertTrue(client instanceof K8sRemoteInterpreterProcess);
K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
assertTrue(process.isSpark());
assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
assertFalse(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user user1"));
}
}

0 comments on commit 73f9650

Please sign in to comment.