From 0b72660a8da074f303ea1795af9ee1f0312877a7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 5 Jul 2014 21:11:15 -0700 Subject: [PATCH 01/17] Initial WIP example of supporing globally named accumulators. --- .../scala/org/apache/spark/Accumulators.scala | 15 +++++++++++++-- .../scala/org/apache/spark/SparkContext.scala | 10 ++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 19 +++++++++++++++++-- .../apache/spark/scheduler/StageInfo.scala | 5 +++++ .../org/apache/spark/scheduler/TaskInfo.scala | 7 +++++++ .../spark/ui/jobs/JobProgressListener.scala | 15 ++++++++++++++- .../org/apache/spark/ui/jobs/StagePage.scala | 11 +++++++++-- 7 files changed, 75 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 9c55bfbb47626..f2fa9b5535f6a 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -51,6 +51,13 @@ class Accumulable[R, T] ( Accumulators.register(this, true) + /** A name for this accumulator / accumulable for display in Spark's UI. + * Note that names must be unique within a SparkContext. */ + def name: String = s"accumulator_$id" + + /** Whether to display this accumulator in the web UI. */ + def display: Boolean = true + /** * Add more data to this accumulator / accumulable * @param term the data to add @@ -219,8 +226,12 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa * @param param helper object defining how to add elements of type `T` * @tparam T result type */ -class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T]) - extends Accumulable[T,T](initialValue, param) +class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], _name: String, _display: Boolean) + extends Accumulable[T,T](initialValue, param) { + override def name = if (_name.eq(null)) s"accumulator_$id" else _name + override def display = _display + def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, null, true) +} /** * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8819e73d17fb2..bf971f8f3a88f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -757,6 +757,16 @@ class SparkContext(config: SparkConf) extends Logging { def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `+=` method. Only the driver can access the accumulator's `value`. + * + * This version adds a custom name to the accumulator for display in the Spark UI. + */ + def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = { + new Accumulator(initialValue, param, name, true) + } + /** * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values * with `+=`. Only the driver can access the accumuable's `value`. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 81c136d970312..3e91b9a859fb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -791,9 +791,10 @@ class DAGScheduler( val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, - event.taskMetrics)) + if (!stageIdToStage.contains(task.stageId)) { + listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, + event.taskMetrics)) // Skip all the actions if the stage has been cancelled. return } @@ -809,12 +810,24 @@ class DAGScheduler( listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) runningStages -= stage } + event.reason match { case Success => logInfo("Completed " + task) if (event.accumUpdates != null) { // TODO: fail the stage if the accumulator update fails... Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted + event.accumUpdates.foreach { case (id, partialValue) => + val acc = Accumulators.originals(id) + val name = acc.name + // To avoid UI cruft, ignore cases where value wasn't updated + if (partialValue != acc.zero) { + val stringPartialValue = s"${partialValue}" + val stringValue = s"${acc.value}" + stageToInfos(stage).accumulatorValues(name) = stringValue + event.taskInfo.accumValues += ((name, stringPartialValue)) + } + } } pendingTasks(stage) -= task task match { @@ -945,6 +958,8 @@ class DAGScheduler( // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } + listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, + event.taskMetrics)) submitWaitingStages() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 480891550eb60..6db83ff551b10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -17,6 +17,9 @@ package org.apache.spark.scheduler +import scala.collection.mutable.HashMap +import scala.collection.mutable.Map + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.RDDInfo @@ -37,6 +40,8 @@ class StageInfo( var completionTime: Option[Long] = None /** If the stage failed, the reason why. */ var failureReason: Option[String] = None + /** Terminal values of accumulables updated during this stage. */ + val accumulatorValues: Map[String, String] = HashMap[String, String]() def stageFailed(reason: String) { failureReason = Some(reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 29de0453ac19a..091e9a17ee6a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.collection.mutable.ListBuffer + import org.apache.spark.annotation.DeveloperApi /** @@ -41,6 +43,11 @@ class TaskInfo( */ var gettingResultTime: Long = 0 + /** + * Terminal values of accumulables updated during this task. + */ + val accumValues = ListBuffer[(String, String)]() + /** * The time when the task has completed successfully (including the time to remotely fetch * results, if necessary). diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2286a7f952f28..1ef9076118457 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.{HashMap, ListBuffer, Map} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -48,6 +48,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { // TODO: Should probably consolidate all following into a single hash map. val stageIdToTime = HashMap[Int, Long]() + val stageIdToAccumulables = HashMap[Int, Map[String, String]]() val stageIdToInputBytes = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() @@ -73,6 +74,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val stageId = stage.stageId // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage poolToActiveStages(stageIdToPool(stageId)).remove(stageId) + + val accumulables = stageIdToAccumulables.getOrElseUpdate(stageId, HashMap[String, String]()) + stageCompleted.stageInfo.accumulatorValues.foreach { case (name, value) => + accumulables(name) = value + } + activeStages.remove(stageId) if (stage.failureReason.isEmpty) { completedStages += stage @@ -89,6 +96,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val toRemove = math.max(retainedStages / 10, 1) stages.take(toRemove).foreach { s => stageIdToTime.remove(s.stageId) + stageIdToAccumulables.remove(s.stageId) stageIdToInputBytes.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) @@ -147,6 +155,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val info = taskEnd.taskInfo if (info != null) { + val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, HashMap[String, String]()) + info.accumValues.map { case (name, value) => + accumulables(name) = value + } + // create executor summary map if necessary val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, op = new HashMap[String, ExecutorSummary]()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index afb8ed754ff8b..b8b88d2726d97 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.xml.{Unparsed, Node} import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} @@ -57,6 +57,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 + val accumulables = listener.stageIdToAccumulables(stageId) var activeTime = 0L val now = System.currentTimeMillis @@ -102,10 +103,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on + val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") + def accumulableRow(acc: (String, String)) = {acc._1}{acc._2} + val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow, accumulables.toSeq) + val taskHeaders: Seq[String] = Seq( "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", - "Launch Time", "Duration", "GC Time") ++ + "Launch Time", "Duration", "GC Time", "Accumulators") ++ {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ @@ -217,6 +222,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {

Summary Metrics for {numCompleted} Completed Tasks

++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++

Aggregated Metrics by Executor

++ executorTable.toNodeSeq ++ +

Accumulators

++ accumulableTable ++

Tasks

++ taskTable UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId), @@ -283,6 +289,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} + {Unparsed(info.accumValues.map{ case (k, v) => s"$k += $v" }.mkString("
"))}