From a7943833dbbc19ae6962388a58a20877f4e5d4dd Mon Sep 17 00:00:00 2001 From: Michal Vince Date: Thu, 29 Sep 2022 10:10:10 +0200 Subject: [PATCH] [ZEPPELIN-5760] fix passing configs to spark in k8s (#4398) * passing static arguments to spark-submit command so driver can pick them up * fixed static names * removed duplicate driver memory setting * fixed driver extra java opts * extend test * use ZEPPELIN_SPARK_CONF env var to pass spark configurations * fix import wildmark * fix separator * remove redundant concatenation * - remove redundant concatenation - fix tests --- .../launcher/K8sRemoteInterpreterProcess.java | 74 +++++++++++++------ .../K8sRemoteInterpreterProcessTest.java | 40 ++++++---- .../K8sStandardInterpreterLauncherTest.java | 4 +- 3 files changed, 77 insertions(+), 41 deletions(-) diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index a39b0bbb617..08569bba97c 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -25,12 +25,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.StringJoiner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; @@ -78,6 +78,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess private static final String SPARK_CONTAINER_IMAGE = "zeppelin.k8s.spark.container.image"; private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN"; private static final String ENV_ZEPPELIN_HOME = "ZEPPELIN_HOME"; + private static final String SPARK_DRIVER_DEFAULTJAVAOPTS = "spark.driver.defaultJavaOptions"; + private static final String SPARK_DRIVER_EXTRAJAVAOPTS = "spark.driver.extraJavaOptions"; public K8sRemoteInterpreterProcess( KubernetesClient client, @@ -319,9 +321,24 @@ Properties getTemplateBindings(String userName) { if (isSpark()) { int webUiPort = 4040; k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage); + + // There is already initial value following --driver-java-options added in interpreter.sh + // so we need to pass spark.driver.defaultJavaOptions and spark.driver.extraJavaOptions + // as SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF env variable to build spark-submit command correctly. + StringJoiner driverExtraJavaOpts = new StringJoiner(" "); + if (properties.containsKey(SPARK_DRIVER_DEFAULTJAVAOPTS)) { + driverExtraJavaOpts.add((String) properties.remove(SPARK_DRIVER_DEFAULTJAVAOPTS)); + } + if (properties.containsKey(SPARK_DRIVER_EXTRAJAVAOPTS)) { + driverExtraJavaOpts.add((String) properties.remove(SPARK_DRIVER_EXTRAJAVAOPTS)); + } + if (driverExtraJavaOpts.length() > 0) { + k8sEnv.put("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF", driverExtraJavaOpts.toString()); + } + if (isSparkOnKubernetes(properties)) { - k8sEnv.put("SPARK_SUBMIT_OPTIONS", - getEnv().getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName)); + addSparkK8sProperties(); + k8sEnv.put("ZEPPELIN_SPARK_CONF", prepareZeppelinSparkConf(userName)); } k8sEnv.put("SPARK_HOME", getEnv().getOrDefault("SPARK_HOME", "/spark")); @@ -403,28 +420,35 @@ boolean isSparkOnKubernetes(Properties interpreterProperties) { } @VisibleForTesting - String buildSparkSubmitOptions(String userName) { - StringBuilder options = new StringBuilder(); - - options.append(" --master k8s://https://kubernetes.default.svc"); - options.append(" --deploy-mode client"); - if (properties.containsKey(SPARK_DRIVER_MEMORY)) { - options.append(" --driver-memory ").append(properties.get(SPARK_DRIVER_MEMORY)); - } + String prepareZeppelinSparkConf(String userName) { + StringJoiner sparkConfSJ = new StringJoiner("|"); if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) { - options.append(" --proxy-user ").append(userName); + sparkConfSJ.add("--proxy-user"); + sparkConfSJ.add(userName); } - options.append(" --conf spark.kubernetes.namespace=").append(getInterpreterNamespace()); - options.append(" --conf spark.executor.instances=1"); - options.append(" --conf spark.kubernetes.driver.pod.name=").append(getPodName()); - String sparkContainerImage = properties.containsKey(SPARK_CONTAINER_IMAGE) ? properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage; - options.append(" --conf spark.kubernetes.container.image=").append(sparkContainerImage); - options.append(" --conf spark.driver.bindAddress=0.0.0.0"); - options.append(" --conf spark.driver.host=").append(getInterpreterPodDnsName()); - options.append(" --conf spark.driver.port=").append(getSparkDriverPort()); - options.append(" --conf spark.blockManager.port=").append(getSparkBlockManagerPort()); - - return options.toString(); + + for (String key : properties.stringPropertyNames()) { + String propValue = properties.getProperty(key); + if (isSparkConf(key, propValue)) { + sparkConfSJ.add("--conf"); + sparkConfSJ.add(key + "=" + propValue); + } + } + + return sparkConfSJ.toString(); + } + + private void addSparkK8sProperties() { + properties.setProperty("spark.master", "k8s://https://kubernetes.default.svc"); + properties.setProperty("spark.submit.deployMode", "client"); + properties.setProperty("spark.kubernetes.namespace", getInterpreterNamespace()); + properties.setProperty("spark.kubernetes.driver.pod.name", getPodName()); + properties.setProperty("spark.kubernetes.container.image", properties.containsKey(SPARK_CONTAINER_IMAGE) ? + properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage); + properties.setProperty("spark.driver.bindAddress", "0.0.0.0"); + properties.setProperty("spark.driver.host", getInterpreterPodDnsName()); + properties.setProperty("spark.driver.port", String.valueOf(getSparkDriverPort())); + properties.setProperty("spark.blockManager.port", String.valueOf(getSparkBlockManagerPort())); } private String getInterpreterPodDnsName() { @@ -433,6 +457,10 @@ private String getInterpreterPodDnsName() { getInterpreterNamespace()); } + private boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + /** * See xxx-interpreter-pod.yaml * @return SparkDriverPort diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index df211eb7966..82e56efcd8f 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -138,6 +138,8 @@ public void testGetTemplateBindingsForSpark() { Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); + properties.put("spark.jars.ivy", "my_ivy_path"); + properties.put("spark.driver.extraJavaOptions", "-Dextra_option"); Map envs = new HashMap<>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); @@ -171,16 +173,21 @@ public void testGetTemplateBindingsForSpark() { envs = (HashMap) p.get("zeppelin.k8s.envs"); assertTrue( envs.containsKey("SPARK_HOME")); + assertTrue( envs.containsKey("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF")); + String driverExtraOptions = envs.get("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF"); + assertTrue(driverExtraOptions.contains("-Dextra_option")); String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); - assertTrue(sparkSubmitOptions.startsWith("my options ")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); - assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc")); - assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort())); - assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort())); - assertFalse(sparkSubmitOptions.contains("--proxy-user")); + assertTrue(sparkSubmitOptions.startsWith("my options")); + String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF"); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default")); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0")); + assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + intp.getPodName() + ".default.svc")); + assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + intp.getSparkDriverPort())); + assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort())); + assertTrue(zeppelinSparkConf.contains("spark.jars.ivy=my_ivy_path")); + assertFalse(zeppelinSparkConf.contains("--proxy-user")); assertTrue(intp.isSpark()); } @@ -225,14 +232,15 @@ public void testGetTemplateBindingsForSparkWithProxyUser() { assertTrue( envs.containsKey("SPARK_HOME")); String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); - assertTrue(sparkSubmitOptions.startsWith("my options ")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); - assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc")); - assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort())); - assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort())); - assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser")); + assertTrue(sparkSubmitOptions.startsWith("my options")); + String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF"); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default")); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); + assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0")); + assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + intp.getPodName() + ".default.svc")); + assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + intp.getSparkDriverPort())); + assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort())); + assertTrue(zeppelinSparkConf.contains("--proxy-user|mytestUser")); assertTrue(intp.isSpark()); } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java index 4afeb0ff5b8..8433f237011 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java @@ -98,7 +98,7 @@ public void testK8sLauncherWithSparkAndUserImpersonate() throws IOException { assertTrue(client instanceof K8sRemoteInterpreterProcess); K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); - assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1")); + assertTrue(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user|user1")); } @Test @@ -131,6 +131,6 @@ public void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOExcepti assertTrue(client instanceof K8sRemoteInterpreterProcess); K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); - assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1")); + assertFalse(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user user1")); } }