Skip to content

Commit

Permalink
update AkkaUtilsSuite test for the actorSelection changes, fix typos …
Browse files Browse the repository at this point in the history
…based on comments, and remove extra lines I missed in rebase from AkkaUtils
  • Loading branch information
tgravescs committed Jan 20, 2014
1 parent f351763 commit 5721c5a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 39 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ private[spark] class SecurityManager extends Logging {
logDebug("is auth enabled = " + isAuthOn + " is uiAuth enabled = " + isUIAuthOn)

/**
* In Yarn mode its uses Hadoop UGI to pass the secret as that
* In Yarn mode it uses Hadoop UGI to pass the secret as that
* will keep it protected. For a standalone SPARK cluster
* use a environment variable SPARK_SECRET to specify the secret.
* This probably isn't ideal but only the user who starts the process
* should have access to view the variable (atleast on Linux).
* should have access to view the variable (at least on Linux).
* Since we can't set the environment variable we set the
* java system property SPARK_SECRET so it will automatically
* generate a secret is not specified. This definitely is not
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ private[spark] object AkkaUtils extends Logging {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
|akka.remote.netty.require-cookie = "$requireCookie"
|akka.remote.netty.secure-cookie = "$secureCookie"
""".stripMargin))

val actorSystem = if (indestructible) {
Expand Down
45 changes: 10 additions & 35 deletions core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,17 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
System.setProperty("SPARK_SECRET", "bad")
val securityManagerBad= new SecurityManager();

assert(securityManagerBad.isAuthenticationEnabled() === true)

val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = conf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTracker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)

assert(securityManagerBad.isAuthenticationEnabled() === true)

masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)

val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)

// this should fail since password wrong
intercept[SparkException] { slaveTracker.getServerStatuses(10, 0) }
intercept[akka.actor.ActorNotFound] {
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}

actorSystem.shutdown()
slaveSystem.shutdown()
Expand Down Expand Up @@ -198,30 +186,17 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
System.setProperty("SPARK_SECRET", "bad")
val securityManagerBad = new SecurityManager();

assert(securityManagerBad.isAuthenticationEnabled() === false)

val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = conf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTracker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)


assert(securityManagerBad.isAuthenticationEnabled() === false)

masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)

val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)

// this should fail since security on in server and off in client
intercept[SparkException] { slaveTracker.getServerStatuses(10, 0) }
intercept[akka.actor.ActorNotFound] {
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}

actorSystem.shutdown()
slaveSystem.shutdown()
Expand Down

0 comments on commit 5721c5a

Please sign in to comment.