Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
fix issues with mesos/kerberos support
Browse files Browse the repository at this point in the history
  • Loading branch information
mgummelt authored and Michael Gummelt committed May 2, 2016
1 parent 6d03aab commit 445e896
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import javax.xml.bind.DatatypeConverter
import scala.collection.JavaConverters._

import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -56,11 +57,13 @@ class MesosKerberosHandler(conf: SparkConf,

def start(): Unit = {
logInfo("Starting delegation token renewer")
HdfsConfiguration.init()
renewalThread = new Thread(new Runnable {
def run() {
renewLoop()
}
})
renewalThread.setDaemon(true)
renewalThread.start()
}

Expand Down Expand Up @@ -125,11 +128,9 @@ class MesosKerberosHandler(conf: SparkConf,

private def getHDFSDelegationTokens(ugi: UserGroupInformation): Credentials = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val namenodes = Option(hadoopConf.get("dfs.ha.namenodes.hdfs", null)).
map(_.split(",")).getOrElse(Array[String]()).
flatMap(id => Option(hadoopConf.get(s"dfs.namenode.rpc-address.hdfs.$id", null))).
map(hostPort => new Path(s"hdfs://$hostPort")).
toSet

val namenodes = Set(FileSystem.get(hadoopConf).getHomeDirectory())

logInfo(s"Found these HDFS namenodes: $namenodes")
val ugiCreds = ugi.getCredentials
ugi.doAs(new PrivilegedExceptionAction[Unit] {
Expand Down

0 comments on commit 445e896

Please sign in to comment.