Skip to content

Commit

Permalink
stage/taskList
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Apr 1, 2015
1 parent 63eb4a6 commit fdfc181
Show file tree
Hide file tree
Showing 7 changed files with 3,215 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.ws.rs.core.MediaType
import org.apache.spark.SparkException
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.SparkEnum

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneStageResource(uiRoot: UIRoot) {
Expand Down Expand Up @@ -53,13 +55,7 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@PathParam("stageId") stageId: Int,
@PathParam("attemptId") attemptId: Int
): StageData = {
forStageAttempt(appId, stageId, attemptId) { case (listener, status, stageInfo) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
includeDetails = true)
}
Expand All @@ -73,13 +69,7 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@PathParam("attemptId") attemptId: Int,
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String
): TaskMetricDistributions = {
forStageAttempt(appId, stageId, attemptId) { case (listener, status, stageInfo) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
val quantiles = quantileString.split(",").map{s =>
try {
s.toDouble
Expand All @@ -92,6 +82,24 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
}
}

@GET
@Path("/{attemptId: \\d+}/taskList")
def taskList(
@PathParam("appId") appId: String,
@PathParam("stageId") stageId: Int,
@PathParam("attemptId") attemptId: Int,
@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int,
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting
): Seq[TaskData] = {
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
val tasks = stageUiData.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
.sorted(sortBy.ordering)
tasks.slice(offset, offset + length)
}
}


def forStage[T](appId: String, stageId: Int)
(f: (JobProgressListener, Seq[(StageStatus, StageInfo)]) => T): T = {
uiRoot.withSparkUI(appId) { ui =>
Expand All @@ -111,14 +119,20 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
}

def forStageAttempt[T](appId: String, stageId: Int, attemptId: Int)
(f: (JobProgressListener, StageStatus, StageInfo) => T): T = {
(f: (StageStatus, StageInfo, StageUIData) => T): T = {
forStage(appId, stageId) { case (listener, attempts) =>
val oneAttempt = attempts.filter{ case (status, stage) =>
stage.attemptId == attemptId
}.headOption
oneAttempt match {
case Some((status, stageInfo)) =>
f(listener, status, stageInfo)
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
f(status, stageInfo, stageUiData)
case None =>
val stageAttempts = attempts.map { _._2.attemptId}
throw new NotFoundException(s"unknown attempt for stage $stageId. " +
Expand All @@ -127,3 +141,51 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
}
}
}

sealed abstract class TaskSorting extends SparkEnum {
def ordering: Ordering[TaskData]
def alternateNames: Seq[String] = Seq()
}
object TaskSorting extends JerseyEnum[TaskSorting] {
final val ID = {
case object ID extends TaskSorting {
def ordering = Ordering.by{td: TaskData => td.taskId}
}
ID
}

final val IncreasingRuntime = {
case object IncreasingRuntime extends TaskSorting {
def ordering = Ordering.by{td: TaskData =>
td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
}
override def alternateNames = Seq("runtime", "+runtime")
}
IncreasingRuntime
}

final val DecreasingRuntime = {
case object DecreasingRuntime extends TaskSorting {
def ordering = IncreasingRuntime.ordering.reverse
override def alternateNames = Seq("-runtime")
}
DecreasingRuntime
}

val values = Seq(
ID,
IncreasingRuntime,
DecreasingRuntime
)

val alternateNames: Map[String, TaskSorting] = values.flatMap{x => x.alternateNames.map{_ -> x}}.toMap

override def fromString(s: String): TaskSorting = {
alternateNames.find { case (k, v) =>
k.toLowerCase() == s.toLowerCase()
}.map{_._2}.getOrElse{
super.fromString(s)
}
}
}

Loading

0 comments on commit fdfc181

Please sign in to comment.