From 445e89681f30ea66e95532e337721decf2708895 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 2 May 2016 21:51:52 +0000 Subject: [PATCH] fix issues with mesos/kerberos support --- .../cluster/mesos/MesosKerberosHandler.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosKerberosHandler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosKerberosHandler.scala index 83c654dbf3113..466a6198843b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosKerberosHandler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosKerberosHandler.scala @@ -26,8 +26,9 @@ import javax.xml.bind.DatatypeConverter import scala.collection.JavaConverters._ import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.deploy.SparkHadoopUtil @@ -56,11 +57,13 @@ class MesosKerberosHandler(conf: SparkConf, def start(): Unit = { logInfo("Starting delegation token renewer") + HdfsConfiguration.init() renewalThread = new Thread(new Runnable { def run() { renewLoop() } }) + renewalThread.setDaemon(true) renewalThread.start() } @@ -125,11 +128,9 @@ class MesosKerberosHandler(conf: SparkConf, private def getHDFSDelegationTokens(ugi: UserGroupInformation): Credentials = { val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - val namenodes = Option(hadoopConf.get("dfs.ha.namenodes.hdfs", null)). - map(_.split(",")).getOrElse(Array[String]()). - flatMap(id => Option(hadoopConf.get(s"dfs.namenode.rpc-address.hdfs.$id", null))). - map(hostPort => new Path(s"hdfs://$hostPort")). - toSet + + val namenodes = Set(FileSystem.get(hadoopConf).getHomeDirectory()) + logInfo(s"Found these HDFS namenodes: $namenodes") val ugiCreds = ugi.getCredentials ugi.doAs(new PrivilegedExceptionAction[Unit] {