Skip to content

Commit

Permalink
switch to using new "enum"
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Mar 20, 2015
1 parent fef6605 commit 2382bef
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 130 deletions.
8 changes: 1 addition & 7 deletions core/src/main/java/org/apache/spark/JobExecutionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@

package org.apache.spark;

import org.apache.spark.status.api.EnumUtil;

public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN;

public static JobExecutionStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
}
UNKNOWN
}

This file was deleted.

38 changes: 0 additions & 38 deletions core/src/main/java/org/apache/spark/status/api/EnumUtil.java

This file was deleted.

29 changes: 0 additions & 29 deletions core/src/main/java/org/apache/spark/status/api/StageStatus.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.JobUIData
Expand All @@ -32,14 +31,14 @@ class AllJobsResource(uiRoot: UIRoot) {
@GET
def jobsList(
@PathParam("appId") appId: String,
@QueryParam("status") statuses: java.util.List[JobExecutionStatus]
@QueryParam("status") statuses: java.util.List[JobStatus]
): Seq[JobData] = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val adjStatuses: util.List[JobExecutionStatus] = {
val adjStatuses: util.List[JobStatus] = {
if (statuses.isEmpty) {
java.util.Arrays.asList(JobExecutionStatus.values(): _*)
java.util.Arrays.asList(JobStatus.values: _*)
}
else {
statuses
Expand All @@ -59,12 +58,12 @@ class AllJobsResource(uiRoot: UIRoot) {

object AllJobsResource {

def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = {
def getStatusToJobs(ui: SparkUI): Seq[(JobStatus, Seq[JobUIData])] = {
val statusToJobs = ui.jobProgressListener.synchronized {
Seq(
JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
JobStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
JobStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
JobStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
)
}
statusToJobs
Expand All @@ -91,7 +90,7 @@ object AllJobsResource {
completionTime = job.completionTime.map{new Date(_)},
stageIds = job.stageIds,
jobGroup = job.jobGroup,
status = job.status,
status = JobStatus.fromInternalStatus(job.status),
numTasks = job.numTasks,
numActiveTasks = job.numActiveTasks,
numCompletedTasks = job.numCompletedTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
import org.apache.spark.status.api._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}

Expand All @@ -39,7 +38,7 @@ class AllStagesResource(uiRoot: UIRoot) {
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val adjStatuses = {
if (statuses.isEmpty()) {
java.util.Arrays.asList(StageStatus.values(): _*)
java.util.Arrays.asList(StageStatus.values: _*)
} else {
statuses
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.deploy.history.ApplicationHistoryInfo
import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}
import org.apache.spark.status.api.ApplicationStatus

@Produces(Array(MediaType.APPLICATION_JSON))
class ApplicationListResource(uiRoot: UIRoot) {
Expand All @@ -36,7 +35,7 @@ class ApplicationListResource(uiRoot: UIRoot) {
val allApps = uiRoot.getApplicationInfoList
val adjStatus = {
if (status.isEmpty) {
java.util.Arrays.asList(ApplicationStatus.values(): _*)
java.util.Arrays.asList(ApplicationStatus.values: _*)
} else {
status
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class OneJobResource(uiRoot: UIRoot) {
@PathParam("jobId") jobId: Int
): JobData = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
jobOpt.map { job =>
Expand Down
90 changes: 88 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Date
import scala.collection.Map

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.StageStatus
import org.apache.spark.util.{SparkEnum, SparkEnumCompanion}

class ApplicationInfo(
val id: String,
Expand Down Expand Up @@ -70,7 +70,7 @@ class JobData(
val completionTime: Option[Date],
val stageIds: Seq[Int],
val jobGroup: Option[String],
val status: JobExecutionStatus,
val status: JobStatus,
val numTasks: Int,
val numActiveTasks: Int,
val numCompletedTasks: Int,
Expand Down Expand Up @@ -208,3 +208,89 @@ class AccumulableInfo (
}
}


private[spark] trait JerseyEnum[T <: SparkEnum] extends SparkEnumCompanion[T] {
def fromString(s: String): T = {
parseIgnoreCase(s).getOrElse { throw new IllegalArgumentException(
s"Illegal type=$s. Supported type values: ${values.map{_.toString}}")}
}
}

sealed abstract class JobStatus extends SparkEnum
object JobStatus extends JerseyEnum[JobStatus] {
final val RUNNING = {
case object RUNNING extends JobStatus
RUNNING
}
final val SUCCEEDED = {
case object SUCCEEDED extends JobStatus
SUCCEEDED
}
final val FAILED = {
case object FAILED extends JobStatus
FAILED
}
final val UNKNOWN = {
case object UNKNOWN extends JobStatus
UNKNOWN
}

val values = Seq(
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN
)

private[spark] def fromInternalStatus(s: JobExecutionStatus): JobStatus = {
JobStatus.parse(s.name()).get
}
}

sealed abstract class StageStatus extends SparkEnum
object StageStatus extends JerseyEnum[StageStatus] {
final val Active = {
case object Active extends StageStatus
Active
}

final val Complete = {
case object Complete extends StageStatus
Complete
}

final val Failed = {
case object Failed extends StageStatus
Failed
}

final val Pending = {
case object Pending extends StageStatus
Pending
}

val values = Seq(
Active,
Complete,
Failed,
Pending
)
}

sealed abstract class ApplicationStatus extends SparkEnum
object ApplicationStatus extends JerseyEnum[SparkEnum] {
final val COMPLETED = {
case object COMPLETED extends ApplicationStatus
COMPLETED
}

final val RUNNING = {
case object RUNNING extends ApplicationStatus
RUNNING
}

val values = Seq(
COMPLETED,
RUNNING
)
}
19 changes: 8 additions & 11 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,27 @@
package org.apache.spark.ui

import java.net.URL
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.collection.JavaConversions._
import scala.xml.Node

import org.apache.sparktest.TestTags.ActiveTag
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.openqa.selenium.{By, WebDriver}
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.selenium.WebBrowser
import org.scalatest.time.SpanSugar._


import org.apache.spark.LocalSparkContext._
import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.StageStatus
import org.apache.spark.status.api.v1.CustomObjectMapper
import org.apache.sparktest.TestTags.ActiveTag

import org.apache.spark.status.api.v1.{CustomObjectMapper, StageStatus}

/**
* Selenium tests for the Spark Web UI.
Expand Down Expand Up @@ -134,7 +131,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
val stageJson = getJson(sc.ui.get, "stages")
stageJson.children.length should be (1)
(stageJson \ "status").extract[String] should be (StageStatus.Failed.name())
(stageJson \ "status").extract[String] should be (StageStatus.Failed.toString)

// Regression test for SPARK-2105
class NotSerializable
Expand Down Expand Up @@ -266,7 +263,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
JInt(attemptId) <- stage \ "attemptId"
} {
val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete
status should be (exp.name())
status should be (exp.toString)
}

for {
Expand All @@ -275,7 +272,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
} {
val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete
val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId")
(stageJson \ "status").extract[String] should be (exp.name())
(stageJson \ "status").extract[String] should be (exp.toString)
}
}
}
Expand Down

0 comments on commit 2382bef

Please sign in to comment.