Skip to content

Commit

Permalink
expose UI data as json in new endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Feb 6, 2015
1 parent bc36356 commit 4b398d0
Show file tree
Hide file tree
Showing 64 changed files with 2,401 additions and 107 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.10</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/StageStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.spark.status.api;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public enum StageStatus {
Active,
Complete,
Failed,
Pending
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.json4s.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
import org.json4s._

private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
Expand Down Expand Up @@ -98,4 +99,5 @@ private[spark] object JsonProtocol {
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import org.apache.spark.status.{UIRoot, StatusJsonRoute}
import org.apache.spark.status.api.ApplicationInfo
import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}

class AllApplicationsJsonRoute(val uiRoot: UIRoot) extends StatusJsonRoute[Seq[ApplicationInfo]] {

override def renderJson(request: HttpServletRequest): Seq[ApplicationInfo] = {
//TODO filter on some query params, eg. completed, minStartTime, etc
uiRoot.getApplicationInfoList
}

}

object AllApplicationsJsonRoute {
def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = {
ApplicationInfo(
id = app.id,
name = app.name,
startTime = app.startTime,
endTime = app.endTime,
sparkUser = app.sparkUser,
completed = app.completed
)
}

def convertApplicationInfo(internal: InternalApplicationInfo, completed: Boolean): ApplicationInfo = {
ApplicationInfo(
id = internal.id,
name = internal.desc.name,
startTime = internal.startTime,
endTime = internal.endTime,
sparkUser = internal.desc.user,
completed = completed
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Iterable[ApplicationHistoryInfo]
def getListing(refresh: Boolean): Iterable[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

override def getListing() = applications.values
override def getListing(refresh: Boolean) = {
if (refresh) checkForLogs()
applications.values
}

override def getAppUI(appId: String): Option[SparkUI] = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import org.json4s.JValue
import org.json4s.JsonDSL._

import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
Expand All @@ -34,7 +37,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val allApps = parent.getApplicationList(true).filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

Expand Down Expand Up @@ -67,7 +70,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
{if (requestedIncomplete) "(Incomplete applications)"}
({if (requestedIncomplete) "Incomplete" else "Complete"} applications)
<span style="float: right">
{
if (actualPage > 1) {
Expand All @@ -90,7 +93,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</h4> ++
appTable
} else {
<h4>No completed applications found!</h4> ++
<h4>No {if (requestedIncomplete) "running" else "completed"} applications found!</h4> ++
<p>Did you specify the correct logging directory?
Please verify your setting of <span style="font-style:italic">
spark.history.fs.logDirectory</span> and whether you have the permissions to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.util.NoSuchElementException
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import com.google.common.cache._
import org.apache.spark.deploy.master.ui.MasterApplicationJsonRoute
import org.apache.spark.status.api.ApplicationInfo
import org.apache.spark.status.{UIRoot, JsonRequestHandler}
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
Expand All @@ -45,7 +48,7 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
extends WebUI(securityManager, port, conf) with Logging {
extends WebUI(securityManager, port, conf) with Logging with UIRoot {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
Expand All @@ -71,6 +74,7 @@ class HistoryServer(
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
logError("bad path info!")
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
s"Unexpected path info in request (URI = ${req.getRequestURI()}")
return
Expand Down Expand Up @@ -98,6 +102,10 @@ class HistoryServer(
}
}

def getSparkUI(appKey: String): Option[SparkUI] = {
Option(appCache.get(appKey))
}

initialize()

/**
Expand All @@ -107,7 +115,13 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
def initialize() {
//earlier handlers take precedence
attachPage(new HistoryPage(this))

val jsonHandler = new JsonRequestHandler(this, securityManager)
attachHandler(jsonHandler.jsonContextHandler)


attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

val contextHandler = new ServletContextHandler
Expand Down Expand Up @@ -145,7 +159,11 @@ class HistoryServer(
*
* @return List of all known applications.
*/
def getApplicationList() = provider.getListing()
def getApplicationList(refresh: Boolean) = provider.getListing(refresh)

def getApplicationInfoList: Seq[ApplicationInfo] = {
getApplicationList(true).map{AllApplicationsJsonRoute.appHistoryInfoToPublicAppInfo}.toSeq
}

/**
* Returns the provider configuration to show in the listing page.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import org.apache.spark.status.{UIRoot, JsonRequestHandler, StatusJsonRoute}
import org.apache.spark.status.api.ApplicationInfo

class OneApplicationJsonRoute(val uiRoot: UIRoot) extends StatusJsonRoute[ApplicationInfo] {
override def renderJson(request: HttpServletRequest): ApplicationInfo = {
val appIdOpt = JsonRequestHandler.extractAppId(request.getPathInfo)
appIdOpt.map{ appId =>
val apps = uiRoot.getApplicationInfoList.find{_.id == appId}
apps.getOrElse(throw new IllegalArgumentException("unknown app: " + appId))
}.getOrElse{
throw new IllegalArgumentException("no application id specified")
}
}
}
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,9 @@ private[spark] class Master(

/**
* Rebuild a new SparkUI from the given application's event logs.
* Return whether this is successful.
* Return the UI if successful, else None
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
Expand All @@ -726,7 +726,7 @@ private[spark] class Master(
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
return None
}

val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
Expand All @@ -738,7 +738,7 @@ private[spark] class Master(
logWarning(msg)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
return None
}

val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
Expand All @@ -754,7 +754,7 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
true
Some(ui)
} catch {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
Expand All @@ -764,7 +764,7 @@ private[spark] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
false
None
case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
Expand All @@ -773,7 +773,7 @@ private[spark] class Master(
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
Expand All @@ -36,17 +35,6 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
JsonProtocol.writeApplicationInfo(app)
}

/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import akka.pattern.ask

import org.apache.spark.deploy.DeployMessages.{RequestMasterState, MasterStateResponse}
import org.apache.spark.status.StatusJsonRoute
import org.apache.spark.status.api.ApplicationInfo

import scala.concurrent.Await

class MasterApplicationJsonRoute(val parent: MasterWebUI) extends StatusJsonRoute[ApplicationInfo] {
private val master = parent.masterActorRef
private val timeout = parent.timeout


override def renderJson(request: HttpServletRequest): ApplicationInfo = {
//TODO not really the app id
val appId = request.getPathInfo()
println("pathInfo = " + request.getPathInfo())
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
state.activeApps.find(_.id == appId).orElse({
state.completedApps.find(_.id == appId)
}).map{MasterJsonRoute.masterAppInfoToPublicAppInfo}.getOrElse(null)
}
}
Loading

0 comments on commit 4b398d0

Please sign in to comment.