Skip to content

Commit

Permalink
add accumulableInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Mar 16, 2015
1 parent 00e9cc5 commit 9c0c125
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
import javax.ws.rs.core.MediaType

import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api._
import org.apache.spark.ui.SparkUI
Expand Down Expand Up @@ -87,6 +88,9 @@ object AllStagesResource {
} else {
None
}

val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq

new StageData(
status = status,
stageId = stageInfo.stageId,
Expand All @@ -107,6 +111,7 @@ object AllStagesResource {
schedulingPool = stageUiData.schedulingPool,
name = stageInfo.name,
details = stageInfo.details,
accumulatorUpdates = accumulableInfo,
tasks = taskData,
executorSummary = executorSummary
)
Expand Down Expand Up @@ -135,11 +140,17 @@ object AllStagesResource {
host = uiData.taskInfo.host,
taskLocality = uiData.taskInfo.taskLocality.toString(),
speculative = uiData.taskInfo.speculative,
accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
errorMessage = uiData.errorMessage,
taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
)
}

def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
new AccumulableInfo(acc.id, acc.name, acc.value)
}


def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
new TaskMetrics(
executorDeserializeTime = internal.executorDeserializeTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private[spark] trait UIRoot {
*/
def withSparkUI[T](appId: String)(f: SparkUI => T): T = {
getSparkUI(appId) match {
case Some(ui) => f(ui)
case Some(ui) =>
f(ui)
case None => throw new NotFoundException("no such app: " + appId)
}
}
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class StageData(
val details: String,
val schedulingPool: String,

//TODO what to do about accumulables?
val accumulatorUpdates: Seq[AccumulableInfo],
val tasks: Option[Map[Long, TaskData]],
val executorSummary:Option[Map[String,ExecutorStageSummary]]
)
Expand All @@ -149,6 +149,7 @@ class TaskData(
val host: String,
val taskLocality: String,
val speculative: Boolean,
val accumulatorUpdates: Seq[AccumulableInfo],
val errorMessage: Option[String] = None,
val taskMetrics: Option[TaskMetrics] = None
)
Expand Down Expand Up @@ -191,3 +192,18 @@ class ShuffleWriteMetrics(
val writeTime: Long,
val recordsWritten: Long
)

class AccumulableInfo (
val id: Long,
val name: String,
//no partial updates, since they aren't logged. We can add them later
val value: String) {

override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
this.value == acc.value
case _ => false
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[ {
"id" : "local-1426533911241",
"name" : "Spark shell",
"startTime" : "2015-03-16T19:25:10.242GMT",
"endTime" : "2015-03-16T19:25:45.177GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"id" : "local-1425081759269",
"name" : "Spark shell",
"startTime" : "2015-02-28T00:02:38.277GMT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"name" : "map at <console>:14",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ],
"tasks" : {
"8" : {
"taskId" : 8,
Expand All @@ -28,6 +29,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 1,
"executorRunTime" : 435,
Expand Down Expand Up @@ -56,6 +58,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 2,
"executorRunTime" : 434,
Expand Down Expand Up @@ -84,6 +87,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 2,
"executorRunTime" : 434,
Expand Down Expand Up @@ -112,6 +116,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 2,
"executorRunTime" : 434,
Expand Down Expand Up @@ -140,6 +145,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 2,
"executorRunTime" : 434,
Expand Down Expand Up @@ -168,6 +174,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 1,
"executorRunTime" : 436,
Expand Down Expand Up @@ -196,6 +203,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 2,
"executorRunTime" : 434,
Expand Down Expand Up @@ -224,6 +232,7 @@
"host" : "localhost",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 1,
"executorRunTime" : 435,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:17",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line19.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:17)\n$line19.$read$$iwC$$iwC$$iwC.<init>(<console>:22)\n$line19.$read$$iwC$$iwC.<init>(<console>:24)\n$line19.$read$$iwC.<init>(<console>:26)\n$line19.$read.<init>(<console>:28)\n$line19.$read$.<init>(<console>:32)\n$line19.$read$.<clinit>(<console>)\n$line19.$eval$.<init>(<console>:7)\n$line19.$eval$.<clinit>(<console>)\n$line19.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
}, {
"status" : "Complete",
"stageId" : 1,
Expand All @@ -37,7 +38,8 @@
"diskBytesSpilled" : 0,
"name" : "map at <console>:14",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
}, {
"status" : "Complete",
"stageId" : 0,
Expand All @@ -57,7 +59,8 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:15",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
}, {
"status" : "Failed",
"stageId" : 2,
Expand All @@ -77,5 +80,6 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:20",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:17",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line19.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:17)\n$line19.$read$$iwC$$iwC$$iwC.<init>(<console>:22)\n$line19.$read$$iwC$$iwC.<init>(<console>:24)\n$line19.$read$$iwC.<init>(<console>:26)\n$line19.$read.<init>(<console>:28)\n$line19.$read$.<init>(<console>:32)\n$line19.$read$.<clinit>(<console>)\n$line19.$eval$.<init>(<console>:7)\n$line19.$eval$.<clinit>(<console>)\n$line19.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
}, {
"status" : "Complete",
"stageId" : 1,
Expand All @@ -37,7 +38,8 @@
"diskBytesSpilled" : 0,
"name" : "map at <console>:14",
"details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
}, {
"status" : "Complete",
"stageId" : 0,
Expand All @@ -57,5 +59,6 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:15",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"diskBytesSpilled" : 0,
"name" : "count at <console>:20",
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default"
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[ {
"id" : "local-1426533911241",
"name" : "Spark shell",
"startTime" : "2015-03-16T19:25:10.242GMT",
"endTime" : "2015-03-16T19:25:45.177GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"id" : "local-1425081759269",
"name" : "Spark shell",
"startTime" : "2015-02-28T00:02:38.277GMT",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[ {
"id" : "local-1426533911241",
"name" : "Spark shell",
"startTime" : "2015-03-16T19:25:10.242GMT",
"endTime" : "2015-03-16T19:25:45.177GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"id" : "local-1425081759269",
"name" : "Spark shell",
"startTime" : "2015-02-28T00:02:38.277GMT",
Expand Down
Empty file.
Loading

0 comments on commit 9c0c125

Please sign in to comment.