Skip to content

Commit

Permalink
go back to good ol' java enums
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Apr 15, 2015
1 parent cf86175 commit 9ea682c
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 326 deletions.
8 changes: 7 additions & 1 deletion core/src/main/java/org/apache/spark/JobExecutionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.spark;

import org.apache.spark.status.api.EnumUtil;

public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN
UNKNOWN;

public static JobExecutionStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
}
}
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/EnumUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api;

import com.google.common.base.Joiner;

import java.util.Arrays;

public class EnumUtil {
public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
E[] constants = clz.getEnumConstants();
if (str == null) {
return null;
}
for (E e : constants) {
if (e.name().equalsIgnoreCase(str))
return e;
}
throw new IllegalArgumentException(
String.format("Illegal type='%s'. Supported type values: %s",
str, Joiner.on(", ").join(
Arrays.asList(constants))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1;

import org.apache.spark.status.api.EnumUtil;

public enum ApplicationStatus {
COMPLETED,
RUNNING;

public static ApplicationStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
}

}
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1;

import org.apache.spark.status.api.EnumUtil;

public enum StageStatus {
Active,
Complete,
Failed,
Pending;

public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
}
}
45 changes: 45 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.spark.status.api.v1;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.spark.status.api.EnumUtil;

import java.util.HashSet;
import java.util.Set;

public enum TaskSorting {
ID,
IncreasingRuntime("runtime"),
DecreasingRuntime("-runtime");

final Set<String> alternateNames;
TaskSorting(String... names) {
alternateNames = new HashSet<String>();
for (String n: names) {
alternateNames.add(n);
}
}

public static TaskSorting fromString(String str) {
for (TaskSorting t: values()) {
if (t.alternateNames.contains(str.toLowerCase())) {
return t;
}
}
return EnumUtil.parseIgnoreCase(TaskSorting.class, str);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.JobUIData
Expand All @@ -31,14 +32,14 @@ private[v1] class AllJobsResource(uiRoot: UIRoot) {
@GET
def jobsList(
@PathParam("appId") appId: String,
@QueryParam("status") statuses: java.util.List[JobStatus]
@QueryParam("status") statuses: java.util.List[JobExecutionStatus]
): Seq[JobData] = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] =
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val adjStatuses: util.List[JobStatus] = {
val adjStatuses: util.List[JobExecutionStatus] = {
if (statuses.isEmpty) {
java.util.Arrays.asList(JobStatus.values: _*)
java.util.Arrays.asList(JobExecutionStatus.values(): _*)
}
else {
statuses
Expand All @@ -58,12 +59,12 @@ private[v1] class AllJobsResource(uiRoot: UIRoot) {

private[v1] object AllJobsResource {

def getStatusToJobs(ui: SparkUI): Seq[(JobStatus, Seq[JobUIData])] = {
def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = {
val statusToJobs = ui.jobProgressListener.synchronized {
Seq(
JobStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
JobStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
JobStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
)
}
statusToJobs
Expand All @@ -90,7 +91,7 @@ private[v1] object AllJobsResource {
completionTime = job.completionTime.map{new Date(_)},
stageIds = job.stageIds,
jobGroup = job.jobGroup,
status = JobStatus.fromInternalStatus(job.status),
status = job.status,
numTasks = job.numTasks,
numActiveTasks = job.numActiveTasks,
numCompletedTasks = job.numCompletedTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
import org.apache.spark.status.api._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
import org.apache.spark.util.Distribution
Expand All @@ -39,7 +40,7 @@ private[v1] class AllStagesResource(uiRoot: UIRoot) {
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val adjStatuses = {
if (statuses.isEmpty()) {
java.util.Arrays.asList(StageStatus.values: _*)
java.util.Arrays.asList(StageStatus.values(): _*)
} else {
statuses
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {
val allApps = uiRoot.getApplicationInfoList
val adjStatus = {
if (status.isEmpty) {
Arrays.asList(ApplicationStatus.values: _*)
Arrays.asList(ApplicationStatus.values(): _*)
} else {
status
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ import javax.ws.rs.core.MediaType
import javax.ws.rs.ext.{ContextResolver, Provider}

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.{JsonSerializer, ObjectMapper, SerializationFeature, SerializerProvider}
import com.fasterxml.jackson.databind.module.SimpleModule

import org.apache.spark.util.SparkEnum
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}

@Provider
@Produces(Array(MediaType.APPLICATION_JSON))
Expand All @@ -42,10 +38,6 @@ private[v1] class CustomObjectMapper extends ContextResolver[ObjectMapper]{
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
mapper.setDateFormat(CustomObjectMapper.makeISODateFormat)

val sparkEnumModule = new SimpleModule()
sparkEnumModule.addSerializer(classOf[SparkEnum], new SparkEnumSerializer)
mapper.registerModule(sparkEnumModule)

override def getContext(tpe: Class[_]): ObjectMapper = {
mapper
}
Expand All @@ -59,9 +51,3 @@ private[spark] object CustomObjectMapper {
iso8601
}
}

private[v1] class SparkEnumSerializer extends JsonSerializer[SparkEnum] {
def serialize(se: SparkEnum, jgen: JsonGenerator, provider: SerializerProvider): Unit = {
jgen.writeString(se.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1
import javax.ws.rs.{PathParam, GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.jobs.UIData.JobUIData

@Produces(Array(MediaType.APPLICATION_JSON))
Expand All @@ -30,7 +31,7 @@ private[v1] class OneJobResource(uiRoot: UIRoot) {
@PathParam("jobId") jobId: Int
): JobData = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] =
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
jobOpt.map { job =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.SparkException
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api.v1.StageStatus._
import org.apache.spark.status.api.v1.TaskSorting._
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.SparkEnum
import org.apache.spark.status.api.v1.StageStatus._

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneStageResource(uiRoot: UIRoot) {
Expand Down Expand Up @@ -89,7 +89,7 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
): Seq[TaskData] = {
withStageAttempt(appId, stageId, attemptId) { stage =>
val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
.sorted(sortBy.ordering)
.sorted(OneStageResource.ordering(sortBy))
tasks.slice(offset, offset + length)
}
}
Expand Down Expand Up @@ -145,51 +145,15 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
}
}

sealed abstract class TaskSorting extends SparkEnum {
def ordering: Ordering[TaskData]
def alternateNames: Seq[String] = Seq()
}

object TaskSorting extends JerseyEnum[TaskSorting] {
final val ID = {
case object ID extends TaskSorting {
def ordering = Ordering.by { td: TaskData => td.taskId }
}
ID
}

final val IncreasingRuntime = {
case object IncreasingRuntime extends TaskSorting {
def ordering = Ordering.by { td: TaskData =>
td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
object OneStageResource {
def ordering(taskSorting: TaskSorting): Ordering[TaskData] = {
val extractor: (TaskData => Long) = td =>
taskSorting match {
case ID => td.taskId
case IncreasingRuntime => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
case DecreasingRuntime => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
}
override def alternateNames = Seq("runtime", "+runtime")
}
IncreasingRuntime
}

final val DecreasingRuntime = {
case object DecreasingRuntime extends TaskSorting {
def ordering = IncreasingRuntime.ordering.reverse
override def alternateNames = Seq("-runtime")
}
DecreasingRuntime
}

val values = Seq(
ID,
IncreasingRuntime,
DecreasingRuntime
)

val alternateNames: Map[String, TaskSorting] =
values.flatMap { x => x.alternateNames.map { _ -> x } }.toMap

override def fromString(s: String): TaskSorting = {
alternateNames.find { case (k, v) =>
k.toLowerCase() == s.toLowerCase()
}.map { _._2 }.getOrElse{
super.fromString(s)
}
Ordering.by(extractor)
}
}

Loading

0 comments on commit 9ea682c

Please sign in to comment.