Skip to content

Commit

Permalink
stages have attempt; jobs are sorted; resource for all attempts for o…
Browse files Browse the repository at this point in the history
…ne stage
  • Loading branch information
squito committed Mar 19, 2015
1 parent 190c17a commit dddbd29
Show file tree
Hide file tree
Showing 17 changed files with 444 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ class AllJobsResource(uiRoot: UIRoot) {
statuses
}
}
for {
val jobInfos = for {
(status, jobs) <- statusToJobs
job <- jobs if adjStatuses.contains(status)
} yield {
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}
jobInfos.sortBy{- _.jobId}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
import javax.ws.rs.core.MediaType

import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
import org.apache.spark.status.api._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
Expand Down Expand Up @@ -94,6 +93,7 @@ object AllStagesResource {
new StageData(
status = status,
stageId = stageInfo.stageId,
attemptId = stageInfo.attemptId,
numActiveTasks = stageUiData.numActiveTasks,
numCompleteTasks = stageUiData.numCompleteTasks,
numFailedTasks = stageUiData.numFailedTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class JsonRootResource extends UIRootFromServletContext {
new OneStageResource(uiRoot)
}

@Path("applications/{appId}/stages/{stageId: \\d+}/{attemptId: \\d+}")
def getStageAttempt(): OneStageAttemptResource= {
new OneStageAttemptResource(uiRoot)
}


@Path("applications/{appId}/storage/rdd")
def getRdds(): AllRDDResource = {
new AllRDDResource(uiRoot)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1

import javax.ws.rs.core.MediaType
import javax.ws.rs.{GET, PathParam, Produces}

import org.apache.spark.SparkException

@Produces(Array(MediaType.APPLICATION_JSON))
class OneStageAttemptResource(uiRoot: UIRoot) {

@GET
def stageData(
@PathParam("appId") appId: String,
@PathParam("stageId") stageId: Int,
@PathParam("attemptId") attemptId: Int
): StageData = {
uiRoot.withSparkUI(appId) { ui =>
val listener = ui.stagesTab.listener
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
val matched = stages.find { stage =>
stage.stageId == stageId && stage.attemptId == attemptId
}
matched.map { status -> _ }
}.headOption
oneStage match {
case Some((status, stageInfo)) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
includeDetails = true)
case None =>
throw new NotFoundException(s"unknown (stage, attempt): ($stageId, $attemptId)")
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ class OneStageResource(uiRoot: UIRoot) {
def stageData(
@PathParam("appId") appId: String,
@PathParam("stageId") stageId: Int
): StageData = {
): Seq[StageData] = {
uiRoot.withSparkUI(appId) { ui =>
val listener = ui.stagesTab.listener
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
val matched = stages.find { _.stageId == stageId }
val stageAttempts = stageAndStatus.flatMap { case (status, stages) =>
val matched = stages.filter{ stage => stage.stageId == stageId}
matched.map { status -> _ }
}.headOption
oneStage match {
case Some((status, stageInfo)) =>
}
if (stageAttempts.isEmpty) {
throw new NotFoundException("unknown stage: " + stageId)
} else {
stageAttempts.map { case (status, stageInfo) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
Expand All @@ -46,8 +48,7 @@ class OneStageResource(uiRoot: UIRoot) {
}
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
includeDetails = true)
case None =>
throw new NotFoundException("unknown stage: " + stageId)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RDDPartitionInfo(
class StageData(
val status: StageStatus,
val stageId: Int,
val attemptId: Int,
val numActiveTasks: Int ,
val numCompleteTasks: Int,
val numFailedTasks: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@
[ {
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
"numCompletedTasks" : 8,
"numSkippedTasks" : 8,
"numFailedTasks" : 0,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0
}, {
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
Expand Down Expand Up @@ -40,4 +26,18 @@
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 1
}, {
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
"numCompletedTasks" : 8,
"numSkippedTasks" : 8,
"numFailedTasks" : 0,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0
} ]
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@
[ {
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
"numCompletedTasks" : 8,
"numSkippedTasks" : 8,
"numFailedTasks" : 0,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0
}, {
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
Expand Down Expand Up @@ -40,4 +26,18 @@
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 1
}, {
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
"numCompletedTasks" : 8,
"numSkippedTasks" : 8,
"numFailedTasks" : 0,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0
} ]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[ {
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand All @@ -13,9 +13,9 @@
"numSkippedStages" : 0,
"numFailedStages" : 0
}, {
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Loading

0 comments on commit dddbd29

Please sign in to comment.