Skip to content

Commit

Permalink
fixes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Apr 8, 2015
1 parent 1f53a66 commit e031719
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}

override def getListing(refresh: Boolean): Iterable[FsApplicationHistoryInfo] = {
if (refresh) checkForLogs()
applications.values
}

override def getAppUI(appId: String): Option[SparkUI] = {
try {
val appOpt = applications.get(appId).orElse {
getListing(true)
applications.get(appId)
}
appOpt.map { info =>
applications.get(appId).map { info =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList(true).filter(_.completed != requestedIncomplete)
val allApps = parent.getApplicationList(false).filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ private[spark] object AllRDDResource {
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}


val dataDistribution = if (includeDetails) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ private[v1] object AllStagesResource {
includeDetails: Boolean
): StageData = {

val taskData = if(includeDetails) {
val taskData = if (includeDetails) {
Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
} else {
None
}
val executorSummary = if(includeDetails) {
val executorSummary = if (includeDetails) {
Some(stageUiData.executorSummary.map { case (k, summary) =>
k -> new ExecutorStageSummary(
taskTime = summary.taskTime,
Expand Down Expand Up @@ -129,7 +129,6 @@ private[v1] object AllStagesResource {
}
}


def convertTaskData(uiData: TaskUIData): TaskData = {
new TaskData(
taskId = uiData.taskInfo.taskId,
Expand All @@ -153,10 +152,10 @@ private[v1] object AllStagesResource {
val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq

def getMetric[T](data: Seq[T], f: T => Double): IndexedSeq[Double] =
Distribution(data.map{d=> f(d)}).get.getQuantiles(quantiles)
Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)

abstract class MetricHelper[I,O](f: InternalTaskMetrics => Option[I]) {
val data: Seq[I] = rawMetrics.flatMap{x => f(x)}
val data: Seq[I] = rawMetrics.flatMap { x => f(x) } // expanded to keep the compiler happy
def build: O
def m(f: I => Double): IndexedSeq[Double] = getMetric(data, f)
def metricOption: Option[O] = {
Expand Down Expand Up @@ -211,7 +210,6 @@ private[v1] object AllStagesResource {
)
}.metricOption


new TaskMetricDistributions(
quantiles = quantiles,
executorDeserializeTime = m(_.executorDeserializeTime),
Expand All @@ -232,7 +230,6 @@ private[v1] object AllStagesResource {
new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
}


def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
new TaskMetrics(
executorDeserializeTime = internal.executorDeserializeTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.status.api.v1

import java.util.Date
import java.util.{Arrays, List => JList}
import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType

Expand All @@ -28,14 +29,14 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {

@GET
def appList(
@QueryParam("status") status: java.util.List[ApplicationStatus],
@QueryParam("status") status: JList[ApplicationStatus],
@DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam,
@DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam
): Seq[ApplicationInfo] = {
val allApps = uiRoot.getApplicationInfoList
val adjStatus = {
if (status.isEmpty) {
java.util.Arrays.asList(ApplicationStatus.values: _*)
Arrays.asList(ApplicationStatus.values: _*)
} else {
status
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[v1] trait UIRootFromServletContext {
}

private[v1] class NotFoundException(msg: String) extends WebApplicationException(
new IllegalArgumentException(msg),
new NoSuchElementException(msg),
Response
.status(Response.Status.NOT_FOUND)
.entity(msg)
Expand All @@ -154,7 +154,6 @@ private[v1] class BadParameterException(msg: String) extends WebApplicationExcep
.build()
) {
def this(param: String, exp: String, actual: String) = {
this("Bad value for parameter \"" + param + "\". Expected a " + exp + ", got \"" +
actual + "\"")
this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[v1] class OneRDDResource(uiRoot: UIRoot) {
): RDDStorageInfo = {
uiRoot.withSparkUI(appId) { ui =>
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new IllegalArgumentException("no rdd found w/ id " + rddId)
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.SparkEnum
import org.apache.spark.status.api.v1.StageStatus._

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneStageResource(uiRoot: UIRoot) {
Expand All @@ -34,15 +35,9 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@PathParam("appId") appId: String,
@PathParam("stageId") stageId: Int
): Seq[StageData] = {
forStage(appId, stageId){ (listener,stageAttempts) =>
stageAttempts.map { case (status, stageInfo) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
withStage(appId, stageId){ stageAttempts =>
stageAttempts.map { stage =>
AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
includeDetails = true)
}
}
Expand All @@ -55,8 +50,8 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@PathParam("stageId") stageId: Int,
@PathParam("attemptId") attemptId: Int
): StageData = {
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
withStageAttempt(appId, stageId, attemptId) { stage =>
AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
includeDetails = true)
}
}
Expand All @@ -69,16 +64,16 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@PathParam("attemptId") attemptId: Int,
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String
): TaskMetricDistributions = {
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
val quantiles = quantileString.split(",").map{s =>
withStageAttempt(appId, stageId, attemptId) { stage =>
val quantiles = quantileString.split(",").map { s =>
try {
s.toDouble
} catch {
case nfe: NumberFormatException =>
throw new BadParameterException("quantiles", "double", s)
}
}
AllStagesResource.taskMetricDistributions(stageUiData.taskData.values, quantiles)
AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles)
}
}

Expand All @@ -92,49 +87,57 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
@DefaultValue("20") @QueryParam("length") length: Int,
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting
): Seq[TaskData] = {
forStageAttempt(appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
val tasks = stageUiData.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
withStageAttempt(appId, stageId, attemptId) { stage =>
val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
.sorted(sortBy.ordering)
tasks.slice(offset, offset + length)
}
}

private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData)

def forStage[T](appId: String, stageId: Int)
(f: (JobProgressListener, Seq[(StageStatus, StageInfo)]) => T): T = {
private def withStage[T](appId: String, stageId: Int)
(f: Seq[StageStatusInfoUi] => T): T = {
uiRoot.withSparkUI(appId) { ui =>
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val stageAttempts = stageAndStatus.flatMap { case (status, stages) =>
val matched = stages.filter { stage => stage.stageId == stageId}
matched.map {
status -> _
}
}
val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
if (stageAttempts.isEmpty) {
throw new NotFoundException("unknown stage: " + stageId)
} else {
f(ui.jobProgressListener, stageAttempts)
f(stageAttempts)
}
}
}

private def findStageStatusUIData(
listener: JobProgressListener,
stageId: Int): Seq[StageStatusInfoUi] = {
listener.synchronized {
def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = {
infos.filter { _.stageId == stageId }.map { info =>
val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId),
//this is an internal error -- we should always have uiData
throw new SparkException(
s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}")
)
StageStatusInfoUi(status, info, ui)
}
}
getStatusInfoUi(Active, listener.activeStages.values.toSeq) ++
getStatusInfoUi(Complete, listener.completedStages) ++
getStatusInfoUi(Failed, listener.failedStages) ++
getStatusInfoUi(Pending, listener.pendingStages.values.toSeq)
}
}

def forStageAttempt[T](appId: String, stageId: Int, attemptId: Int)
(f: (StageStatus, StageInfo, StageUIData) => T): T = {
forStage(appId, stageId) { case (listener, attempts) =>
val oneAttempt = attempts.filter{ case (status, stage) =>
stage.attemptId == attemptId
}.headOption
private def withStageAttempt[T](appId: String, stageId: Int, attemptId: Int)
(f: StageStatusInfoUi => T): T = {
withStage(appId, stageId) { attempts =>
val oneAttempt = attempts.find { stage => stage.info.attemptId == attemptId }
oneAttempt match {
case Some((status, stageInfo)) =>
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
)
}
f(status, stageInfo, stageUiData)
case Some(stage) =>
f(stage)
case None =>
val stageAttempts = attempts.map { _._2.attemptId}
val stageAttempts = attempts.map { _.info.attemptId }
throw new NotFoundException(s"unknown attempt for stage $stageId. " +
s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
}
Expand All @@ -146,17 +149,18 @@ sealed abstract class TaskSorting extends SparkEnum {
def ordering: Ordering[TaskData]
def alternateNames: Seq[String] = Seq()
}

object TaskSorting extends JerseyEnum[TaskSorting] {
final val ID = {
case object ID extends TaskSorting {
def ordering = Ordering.by{td: TaskData => td.taskId}
def ordering = Ordering.by { td: TaskData => td.taskId }
}
ID
}

final val IncreasingRuntime = {
case object IncreasingRuntime extends TaskSorting {
def ordering = Ordering.by{td: TaskData =>
def ordering = Ordering.by { td: TaskData =>
td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
}
override def alternateNames = Seq("runtime", "+runtime")
Expand All @@ -179,14 +183,13 @@ object TaskSorting extends JerseyEnum[TaskSorting] {
)

val alternateNames: Map[String, TaskSorting] =
values.flatMap{x => x.alternateNames.map{_ -> x}}.toMap
values.flatMap { x => x.alternateNames.map { _ -> x } }.toMap

override def fromString(s: String): TaskSorting = {
alternateNames.find { case (k, v) =>
k.toLowerCase() == s.toLowerCase()
}.map{_._2}.getOrElse{
}.map { _._2 }.getOrElse{
super.fromString(s)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ private[v1] class SecurityFilter extends ContainerRequestFilter with UIRootFromS
} else {
throw new WebApplicationException(
Response
.status(Response.Status.UNAUTHORIZED)
.entity("user \"" + user + "\"is not authorized")
.status(Response.Status.FORBIDDEN)
.entity(raw"""user "$user"is not authorized""")
.build()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.ui.exec
import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest

import org.apache.spark.status.api.v1.ExecutorSummary

import scala.xml.Node

import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val rddId = parameterId.toInt
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,
includeDetails = true).getOrElse {
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true)
.getOrElse {
// Rather than crashing, render an "RDD Not Found" page
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
}
}

// Worker table
val workerTable = UIUtils.listingTable(workerHeader, workerRow,
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/JsonTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.spark

import org.json4s.JsonMethods
import org.json4s._
import org.json4s.jackson.JsonMethods

trait JsonTestUtils {
def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
val Diff(c, a, d) = validateJson diff expectedJson
val Diff(c, a, d) = validateJson.diff(expectedJson)
val validatePretty = JsonMethods.pretty(validateJson)
val expectedPretty = JsonMethods.pretty(expectedJson)
val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
Expand Down
Loading

0 comments on commit e031719

Please sign in to comment.