Skip to content

Commit

Permalink
undo removal of renderJson from MasterPage, since there is no substit…
Browse files Browse the repository at this point in the history
…ute yet
  • Loading branch information
squito committed Apr 1, 2015
1 parent db61211 commit f5a5196
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 33 deletions.
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateR
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner

private[spark] object JsonProtocol {
private[deploy] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
Expand Down Expand Up @@ -67,6 +67,27 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}

def writeDriverInfo(obj: DriverInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem)
}

def writeMasterState(obj: MasterStateResponse): JObject = {
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}

def writeWorkerState(obj: WorkerStateResponse): JObject = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
Expand All @@ -78,5 +99,4 @@ private[spark] object JsonProtocol {
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterActorRef
private val timeout = parent.timeout


def getMasterState: MasterStateResponse = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
Await.result(stateFuture, timeout)
}

override def renderJson(request: HttpServletRequest): JValue = {
JsonProtocol.writeMasterState(getMasterState)
}

def handleAppKillRequest(request: HttpServletRequest): Unit = {
handleKillRequest(request, id => {
parent.master.idToApp.get(id).foreach { app =>
Expand Down Expand Up @@ -69,21 +72,17 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val state = getMasterState
val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")

val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
"User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)

val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
"Submitted Time", "User", "State", "Duration")
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
completedApps)
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)

val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
"Memory", "Main Class")
Expand Down Expand Up @@ -190,7 +189,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
Expand All @@ -200,7 +199,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
(<a href={killLinkUri} onclick={confirm}>kill</a>)
</span>
}

<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -209,15 +207,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
{
if (active) {
<td>
{app.coresGranted}
</td>
}
}
<td>
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
Expand All @@ -229,14 +220,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = true)
}

private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = false)
}

private def driverRow(driver: DriverInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(driver.state == DriverState.RUNNING ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,24 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.scalatest.FunSuite

import org.apache.spark.{JsonTestUtils, SparkConf}
import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.SparkConf

class JsonProtocolSuite extends FunSuite with JsonTestUtils {
class JsonProtocolSuite extends FunSuite {

test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr))
}

test("writeWorkerInfo") {
val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr))
}

test("writeApplicationDescription") {
val output = JsonProtocol.writeApplicationDescription(createAppDesc())
Expand All @@ -43,6 +56,26 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
}

test("writeDriverInfo") {
val output = JsonProtocol.writeDriverInfo(createDriverInfo())
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr))
}

test("writeMasterState") {
val workers = Array(createWorkerInfo(), createWorkerInfo())
val activeApps = Array(createAppInfo())
val completedApps = Array[ApplicationInfo]()
val activeDrivers = Array(createDriverInfo())
val completedDrivers = Array(createDriverInfo())
val stateResponse = new MasterStateResponse(
"host", 8080, None, workers, activeApps, completedApps,
activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr))
}

test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
Expand All @@ -60,6 +93,13 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}

def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}

def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
Expand All @@ -68,6 +108,15 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
false, createDriverCommand())

def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
Expand All @@ -87,6 +136,15 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
}
}

def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
val Diff(c, a, d) = validateJson diff expectedJson
val validatePretty = JsonMethods.pretty(validateJson)
val expectedPretty = JsonMethods.pretty(expectedJson)
val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
}
}

object JsonConstants {
Expand Down

0 comments on commit f5a5196

Please sign in to comment.