From c6e177b5e32c8b8d13f741887131f1bceb7bc288 Mon Sep 17 00:00:00 2001 From: SunShun Date: Wed, 6 Oct 2021 13:05:47 +0800 Subject: [PATCH] Enable proxy user with kerberos in hdfs, hive and spark --- .gitignore | 1 - .../hadoop/common/conf/HadoopConf.scala | 4 ++++ .../hadoop/common/utils/HDFSUtils.scala | 24 ++++++++++++------- .../resources/linkis-engineconn.properties | 2 -- .../resources/linkis-engineconn.properties | 2 +- .../executor/HiveEngineConnExecutor.scala | 4 ++++ .../resources/linkis-engineconn.properties | 2 +- .../resources/linkis-engineconn.properties | 1 - .../resources/linkis-engineconn.properties | 1 - .../resources/linkis-engineconn.properties | 2 +- .../resources/linkis-engineconn.properties | 1 - .../spark/config/SparkConfiguration.scala | 1 - ...SubmitProcessEngineConnLaunchBuilder.scala | 18 +++++++++++--- 13 files changed, 42 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index 5bb9c0e646..e8b38b97a1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ .idea .DS_Store - out/ linkis.ipr linkis.iws diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/conf/HadoopConf.scala index 6ef0e32830..e64aef1983 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/conf/HadoopConf.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/conf/HadoopConf.scala @@ -28,6 +28,10 @@ object HadoopConf { val KEYTAB_HOST_ENABLED = CommonVars("wds.linkis.keytab.host.enabled", false) + val KEYTAB_PROXYUSER_ENABLED = CommonVars("wds.linkis.keytab.proxyuser.enable", false) + + val KEYTAB_PROXYUSER_SUPERUSER = CommonVars("wds.linkis.keytab.proxyuser.superuser", "hadoop") + val hadoopConfDir = CommonVars("hadoop.config.dir", CommonVars("HADOOP_CONF_DIR", "").getValue).getValue val HADOOP_EXTERNAL_CONF_DIR_PREFIX = CommonVars("wds.linkis.hadoop.external.conf.dir.prefix", "/appcom/config/external-conf/hadoop") diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/utils/HDFSUtils.scala index 4760f16d56..5e211cb3a5 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/utils/HDFSUtils.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/com/webank/wedatasphere/linkis/hadoop/common/utils/HDFSUtils.scala @@ -130,15 +130,23 @@ object HDFSUtils extends Logging { } } - def getUserGroupInformation(userName: String): UserGroupInformation = { + def getUserGroupInformation(userName: String): UserGroupInformation = { if (KERBEROS_ENABLE.getValue) { - val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath - val user = getKerberosUser(userName) - UserGroupInformation.setConfiguration(getConfiguration(userName)) - UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path) - } else { - UserGroupInformation.createRemoteUser(userName) - } + if (!KEYTAB_PROXYUSER_ENABLED.getValue) { + val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath + val user = getKerberosUser(userName) + UserGroupInformation.setConfiguration(getConfiguration(userName)) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path) + } else { + val superUser = KEYTAB_PROXYUSER_SUPERUSER.getValue + val path = new File(KEYTAB_FILE.getValue, superUser + ".keytab").getPath + val user = getKerberosUser(superUser) + UserGroupInformation.setConfiguration(getConfiguration(superUser)) + UserGroupInformation.createProxyUser(userName, UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)) + } + } else { + UserGroupInformation.createRemoteUser(userName) + } } def getKerberosUser(userName: String): String = { diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/resources/linkis-engineconn.properties index a5c5b9631d..e17c80fb28 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/resources/linkis-engineconn.properties @@ -27,5 +27,3 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engine wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook wds.linkis.engineconn.executor.manager.class=com.webank.wedatasphere.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager - - diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties index 7b06ef9c36..6e9128a824 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties @@ -26,4 +26,4 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engine wds.linkis.bdp.hive.init.sql.enable=true -wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook \ No newline at end of file +wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 76d44c1da6..13f1e1a45e 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import com.webank.wedatasphere.linkis.hadoop.common.conf.HadoopConf class HiveEngineConnExecutor(id: Int, sessionState: SessionState, @@ -90,6 +91,9 @@ class HiveEngineConnExecutor(id: Int, override def init(): Unit = { LOG.info(s"Ready to change engine state!") + if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) { + System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); + } setCodeParser(new SQLCodeParser) super.init() } diff --git a/linkis-engineconn-plugins/engineconn-plugins/io_file/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/io_file/src/main/resources/linkis-engineconn.properties index 015a5c3382..d75ad47916 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/io_file/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/io_file/src/main/resources/linkis-engineconn.properties @@ -30,4 +30,4 @@ wds.linkis.engineconn.io.version=1 wds.linkis.engineconn.support.parallelism=true wds.linkis.engineconn.max.free.time=0 -wds.linkis.engine.push.log.enable=false \ No newline at end of file +wds.linkis.engine.push.log.enable=false diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/resources/linkis-engineconn.properties index 376d6ecd18..93e0616294 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/resources/linkis-engineconn.properties @@ -26,4 +26,3 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manage #wds.linkis.engine.io.opts=" -Dfile.encoding=UTF-8 -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=49100 " wds.linkis.engineconn.support.parallelism=true - diff --git a/linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/resources/linkis-engineconn.properties index 380724d838..52ccf19efa 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/pipeline/src/main/resources/linkis-engineconn.properties @@ -26,4 +26,3 @@ wds.linkis.engineconn.debug.enable=true wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manager.engineplugin.pipeline.PipelineEngineConnPlugin wds.linkis.engineconn.max.free.time=5m - diff --git a/linkis-engineconn-plugins/engineconn-plugins/python/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/python/src/main/resources/linkis-engineconn.properties index 17f72996b0..fc200e4423 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/python/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/python/src/main/resources/linkis-engineconn.properties @@ -25,4 +25,4 @@ wds.linkis.engineconn.debug.enable=true wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manager.engineplugin.python.PythonEngineConnPlugin -wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook \ No newline at end of file +wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties index 629bc8b3b8..36bf810ca2 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties @@ -27,4 +27,3 @@ wds.linkis.engineconn.debug.enable=true wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engineplugin.spark.SparkEngineConnPlugin wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ScalaFunctionEngineHook - diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/config/SparkConfiguration.scala index cd8ec88fff..8ee8a62083 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -79,7 +79,6 @@ object SparkConfiguration extends Logging { val IS_VIEWFS_ENV = CommonVars("wds.linkis.spark.engine.is.viewfs.env", true) - private def getMainJarName(): String = { val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory]) if (somePath.isDefined) { diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 52957ea557..a7dda24082 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/com/webank/wedatasphere/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -32,9 +32,11 @@ import com.webank.wedatasphere.linkis.manager.label.entity.Label import com.webank.wedatasphere.linkis.manager.label.entity.engine.UserCreatorLabel import com.webank.wedatasphere.linkis.protocol.UserWithCreator import org.apache.commons.lang.StringUtils - import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import com.webank.wedatasphere.linkis.hadoop.common.conf.HadoopConf + + /** * @@ -159,10 +161,14 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin } } - addOpt("--master", _master) addOpt("--deploy-mode", _deployMode) addOpt("--name", _name) + + if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue && _proxyUser.nonEmpty) { + addOpt("--proxy-user", _proxyUser) + } + //addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue)) // info("No need to add jars for " + _jars.map(fromPath).exists(x => x.equals("hdfs:///")).toString()) _jars = _jars.filter(_.isNotBlankPath()) @@ -364,7 +370,6 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin val file = new java.io.File(x.path) file.isFile }).foreach(jar) - proxyUser(getValueAndRemove(properties, "proxyUser", "")) if (null != darResource) { this.queue(darResource.yarnResource.queueName) } else { @@ -387,6 +392,13 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin } } } + + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) { + this.proxyUser(getValueAndRemove(properties, "proxyUser", "")) + } else { + this.proxyUser(this._userWithCreator.user) + } + //deal spark conf and spark.hadoop.* val iterator = properties.entrySet().iterator() val sparkConfKeys = ArrayBuffer[String]()