Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3454] Expose JSON representation of data shown in WebU #2333

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
353a423
Implemented renderJson for StagePage
sarutak Sep 10, 2014
b8578a7
Implemented renderJson for EnvironmentPage
sarutak Sep 10, 2014
937c8b7
tmp
sarutak Sep 10, 2014
437c241
Merge branch 'master' of git://git.apache.org/spark into json-exposin…
sarutak Sep 11, 2014
5556856
Implemented renderJson for ExecutorPage
sarutak Sep 11, 2014
f7958b0
Implemented renderJson for JobProgressPage
sarutak Sep 11, 2014
9e0010a
Implemented renderJson for PoolPage
sarutak Sep 11, 2014
8850706
Modified style
sarutak Sep 11, 2014
e537be6
Implemented renderJson for StoragePage
sarutak Sep 11, 2014
c108200
Implemented renderJson for RDDPage
sarutak Sep 11, 2014
2f8f9f3
Added "Scheduling Mode" field in the json data returned by JobProgres…
sarutak Sep 11, 2014
a349d0e
Added "Stage Summary" field to the json data returned from StagePage#…
sarutak Sep 11, 2014
eb49ea5
Modified variable name to be returned in EnvironmentPage.scala
sarutak Sep 12, 2014
d4d8c22
Added spaces at the place where creating json in ExecutorPage.scala
sarutak Sep 12, 2014
36ce0ed
Simplified json creating logic in JobProgressPage.scala
sarutak Sep 12, 2014
1882f38
Simplified PoolPage.scala
sarutak Sep 12, 2014
6b159ed
Simplified json creating logic in StagePage.scala
sarutak Sep 12, 2014
270346a
Simplified json creating logic in RDDPage.scala
sarutak Sep 12, 2014
f1b6bcf
Simplified json creating logic in StoragePage.scala
sarutak Sep 12, 2014
72c0644
Modified variable name in StagePage.scala
sarutak Sep 12, 2014
a2dbe2e
Merge branch 'master' of git://git.apache.org/spark into json-exposin…
sarutak Sep 13, 2014
b50a383
Merge branch 'json-exposing-feature' of github.com:sarutak/spark into…
sarutak Sep 13, 2014
7f51a4f
Merge branch 'master' of git://git.apache.org/spark into json-exposin…
sarutak Sep 14, 2014
d41b3ca
Merge branch 'master' of git://git.apache.org/spark into json-exposin…
sarutak Sep 15, 2014
7b4d6eb
Merge branch 'master' of git://git.apache.org/spark into json-exposin…
sarutak Sep 16, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,35 @@ package org.apache.spark.ui.env

import javax.servlet.http.HttpServletRequest

import org.json4s.{JObject, JValue}
import org.json4s.JsonDSL._

import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
val jvmInfoJson =
("Runtime Information" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _))
val sparkPropertiesJson =
("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _))
val systemPropertiesJson =
("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _))
val classPathEntriesJson =
("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _))

val environmentJson =
jvmInfoJson ~
sparkPropertiesJson ~
systemPropertiesJson ~
classPathEntriesJson

environmentJson
}

def render(request: HttpServletRequest): Seq[Node] = {
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.json4s.JValue
import org.json4s.JsonDSL._

import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils

Expand All @@ -44,6 +47,28 @@ private case class ExecutorSummaryInfo(
private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
val storageStatusList = listener.storageStatusList
val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield {
val execInfo = getExecInfo(statusId)
("Executor ID" -> execInfo.id) ~
("Address" -> execInfo.hostPort) ~
("RDD Blocks" -> execInfo.rddBlocks) ~
("Memory Used" -> execInfo.memoryUsed) ~
("Disk Used" -> execInfo.diskUsed) ~
("Active Tasks" -> execInfo.activeTasks) ~
("Failed Tasks" -> execInfo.failedTasks) ~
("Complete Tasks" -> execInfo.completedTasks) ~
("TotalTasks" -> execInfo.totalTasks) ~
("Task Time" -> execInfo.totalDuration) ~
("Input" -> execInfo.totalInputBytes) ~
("Shuffle Read" -> execInfo.totalShuffleRead) ~
("Shuffle Write" -> execInfo.totalShuffleWrite)
}

execInfoJsonList
}

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val maxMem = storageStatusList.map(_.maxMem).sum
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.ui.jobs

import javax.servlet.http.HttpServletRequest

import org.apache.spark.util.JsonProtocol

import org.json4s.JValue
import org.json4s.JsonDSL._

import scala.xml.{Node, NodeSeq}

import org.apache.spark.scheduler.Schedulable
Expand All @@ -31,6 +36,29 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler

override def renderJson(request: HttpServletRequest): JValue = {
listener.synchronized {

val activeStageList =
listener.activeStages.values.map { info => JsonProtocol.stageInfoToJson(info) }
val activeStageJson = ("Active Stages" -> activeStageList)
val completedStageList =
listener.completedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) }
val completedStageJson = ("Completed Stages" -> completedStageList)
val failedStageList =
listener.failedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) }
val failedStageJson = ("Failed Stages" -> failedStageList)

val stageInfoJson =
("Scheduling Mode" -> listener.schedulingMode.map(_.toString).getOrElse("Unknown")) ~
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this in a variable so this is more readable?

activeStageJson ~
completedStageJson ~
failedStageJson

stageInfoJson
}
}

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package org.apache.spark.ui.jobs

import javax.servlet.http.HttpServletRequest

import org.apache.spark.util.JsonProtocol

import scala.xml.Node

import org.json4s.JValue
import org.json4s.JsonDSL._

import org.apache.spark.scheduler.{Schedulable, StageInfo}
import org.apache.spark.ui.{WebUIPage, UIUtils}

Expand All @@ -30,6 +35,34 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
private val sc = parent.sc
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
listener.synchronized {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.map {
case info: StageInfo =>
JsonProtocol.stageInfoToJson(info)
}
case None => Seq[JValue]()
}

val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]()

val poolListJson =
pools.map { schedulable =>
("Pool Name" -> schedulable.name) ~
("Minimum Share" -> schedulable.minShare) ~
("Pool Weight" -> schedulable.weight) ~
("Active Stages" -> activeStages) ~
("Running Tasks" -> schedulable.runningTasks) ~
("Scheduling Mode" -> schedulable.schedulingMode.toString)
}

poolListJson
}
}

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
Expand Down
53 changes: 52 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,66 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.json4s.{JNothing, JObject, JValue}
import org.json4s.JsonDSL._

import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.util.{JsonProtocol, Utils, Distribution}
import org.apache.spark.scheduler.AccumulableInfo

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
val stageId = request.getParameter("id").toInt
val stageAttemptId = request.getParameter("attempt").toInt
var stageSummary = ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId)
val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId))
var stageInfoJson: JValue = JNothing

if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) {
val stageData = stageDataOpt.get

stageSummary ~= ("Executor Run Time" -> stageData.executorRunTime)
if (stageData.inputBytes > 0) stageSummary ~= ("Input Bytes" -> stageData.inputBytes)
if (stageData.shuffleReadBytes > 0) {
stageSummary ~= ("Shuffle Read Bytes" -> stageData.shuffleReadBytes)
}

if (stageData.shuffleWriteBytes > 0) {
stageSummary ~= ("Shuffle Write bytes" -> stageData.shuffleWriteBytes)
}

if (stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0) {
stageSummary ~=
("Memory Bytes Spilled" -> stageData.memoryBytesSpilled) ~
("Disk Bytes Spilled" -> stageData.diskBytesSpilled)
}

val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)

val taskList = tasks.map {
case uiData: TaskUIData =>
var jsonTaskInfo: JValue = JsonProtocol.taskInfoToJson(uiData.taskInfo)
val jsonTaskMetrics: JValue =
if (uiData.taskMetrics.isDefined) {
JsonProtocol.taskMetricsToJson(uiData.taskMetrics.get)
} else JNothing

if (jsonTaskInfo.isInstanceOf[JObject] && jsonTaskMetrics.isInstanceOf[JObject]) {
jsonTaskInfo =
jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject]
}
jsonTaskInfo
}

stageInfoJson = ("Stage Summary" -> stageSummary) ~ ("Tasks" -> taskList)
}
stageInfoJson
}

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
Expand Down
60 changes: 59 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,72 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
import org.json4s.{JNothing, JValue}
import org.json4s.JsonDSL._

import org.apache.spark.storage._
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

/** Page showing storage details for a given RDD */
private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfoOpt = listener.rddInfoList.find(_.id == rddId)

var rddInfoJson: JValue = JNothing

if (rddInfoOpt.isDefined) {
val rddInfo = rddInfoOpt.get

val rddSummaryJson = ("RDD Summary" ->
("RDD ID" -> rddId) ~
("Storage Level" -> rddInfo.storageLevel.description) ~
("Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Total Partitions" -> rddInfo.numPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
("Disk Size" -> rddInfo.diskSize))

val dataDistributionList =
storageStatusList.map { status =>
("Host" -> (status.blockManagerId.host + ":" + status.blockManagerId.port)) ~
("Memory Usage" -> status.memUsedByRdd(rddId)) ~
("Memory Remaining" -> status.memRemaining) ~
("Disk Usage" -> status.diskUsedByRdd(rddId))
}

val dataDistributionJson = ("Data Distribution" -> dataDistributionList)

val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
val blocks = storageStatusList
.flatMap(_.rddBlocksById(rddId))
.sortWith(_._1.name < _._1.name)
.map { case (blockId, status) =>
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}

val partitionList =
blocks.map { case (id, block, locations) =>
("Block Name" -> id.toString) ~
("Storage Level" -> block.storageLevel.description) ~
("Size in Memory" -> block.memSize) ~
("Size on Disk" -> block.diskSize) ~
("Executors" -> locations)
}

val partitionsJson = ("Partitions" -> partitionList)

rddInfoJson =
rddSummaryJson ~
dataDistributionJson ~
partitionsJson
}
rddInfoJson
}

def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.json4s.JValue
import org.json4s.JsonDSL._

import org.apache.spark.storage.RDDInfo
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.{JsonProtocol, Utils}

/** Page showing list of RDD's currently stored in the cluster */
private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
private val listener = parent.listener

override def renderJson(request: HttpServletRequest): JValue = {
val rddJsonList =
listener.rddInfoList.map { info => JsonProtocol.rddInfoToJson(info) }

rddJsonList
}

def render(request: HttpServletRequest): Seq[Node] = {
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
Expand Down