From f8e105390b634e2221f68081639fd658ba9a6f54 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 10 Apr 2014 20:22:27 -0700 Subject: [PATCH] Added Spark and Streaming UI unit tests. --- .../scala/org/apache/spark/ui/WebUI.scala | 3 - .../apache/spark/ui/env/EnvironmentTab.scala | 8 +- .../apache/spark/ui/exec/ExecutorsTab.scala | 8 +- .../apache/spark/ui/jobs/JobProgressTab.scala | 12 +-- .../spark/ui/storage/BlockManagerTab.scala | 10 +-- .../scala/org/apache/spark/ui/UISuite.scala | 67 ++++++++++++++- .../spark/streaming/ui/StreamingTab.scala | 12 +-- .../org/apache/spark/streaming/UISuite.scala | 84 ++++--------------- 8 files changed, 94 insertions(+), 110 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 655239089015c..592a440219d65 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -132,9 +132,6 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { pages += page } - /** Initialize this tab and attach all relevant pages. */ - def initialize() - /** Get a list of header tabs from the parent UI. */ def headerTabs: Seq[WebUITab] = parent.getTabs } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 0f1ea7fa8d44d..7797057fa1aa9 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -25,12 +25,8 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi val basePath = parent.basePath val listener = new EnvironmentListener - initialize() - - def initialize() { - attachPage(new IndexPage(this)) - parent.registerListener(listener) - } + attachPage(new IndexPage(this)) + parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 843db7c8d956d..e9ec18a3e74af 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -29,12 +29,8 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut val basePath = parent.basePath val listener = new ExecutorsListener(parent.storageStatusListener) - initialize() - - def initialize() { - attachPage(new IndexPage(this)) - parent.registerListener(listener) - } + attachPage(new IndexPage(this)) + parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 7fe06b39346f5..da9de035f89f1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -30,14 +30,10 @@ private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stag val conf = if (live) sc.conf else new SparkConf val listener = new JobProgressListener(conf) - initialize() - - def initialize() { - attachPage(new IndexPage(this)) - attachPage(new StagePage(this)) - attachPage(new PoolPage(this)) - parent.registerListener(listener) - } + attachPage(new IndexPage(this)) + attachPage(new StagePage(this)) + attachPage(new PoolPage(this)) + parent.registerListener(listener) def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala index 492c223625e6b..05b6ef2cf0f4e 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala @@ -29,13 +29,9 @@ private[ui] class BlockManagerTab(parent: SparkUI) extends WebUITab(parent, "sto val basePath = parent.basePath val listener = new BlockManagerListener(parent.storageStatusListener) - initialize() - - def initialize() { - attachPage(new IndexPage(this)) - attachPage(new RddPage(this)) - parent.registerListener(listener) - } + attachPage(new IndexPage(this)) + attachPage(new RddPage(this)) + parent.registerListener(listener) } /** diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 45c322427930d..0ca5c0cd237cc 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -18,16 +18,81 @@ package org.apache.spark.ui import java.net.ServerSocket +import javax.servlet.http.HttpServletRequest +import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.LocalSparkContext._ +import scala.xml.Node class UISuite extends FunSuite { + + test("basic ui visibility") { + withSpark(new SparkContext("local", "test")) { sc => + // test if the ui is visible, and all the expected tabs are visible + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + assert(html.toLowerCase.contains("stages")) + assert(html.toLowerCase.contains("storage")) + assert(html.toLowerCase.contains("environment")) + assert(html.toLowerCase.contains("executors")) + } + } + } + + test("visibility at localhost:4040") { + withSpark(new SparkContext("local", "test")) { sc => + // test if visible from http://localhost:4040 + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL("http://localhost:4040").mkString + assert(html.toLowerCase.contains("stages")) + } + } + } + + test("attaching a new tab") { + withSpark(new SparkContext("local", "test")) { sc => + val sparkUI = sc.ui + + val newTab = new WebUITab(sparkUI, "foo") { + attachPage(new WebUIPage("") { + override def render(request: HttpServletRequest): Seq[Node] = { + "html magic" + } + }) + } + sparkUI.attachTab(newTab) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + + // check whether new page exists + assert(html.toLowerCase.contains("foo")) + + // check whether other pages still exist + assert(html.toLowerCase.contains("stages")) + assert(html.toLowerCase.contains("storage")) + assert(html.toLowerCase.contains("environment")) + assert(html.toLowerCase.contains("executors")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString + // check whether new page exists + assert(html.contains("magic")) + } + } + } + test("jetty port increases under contention") { val startPort = 4040 val server = new Server(startPort) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index be8e652899ebe..51448d15c6516 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,8 +17,6 @@ package org.apache.spark.streaming.ui -import java.util.concurrent.atomic.AtomicInteger - import org.apache.spark.Logging import org.apache.spark.streaming.StreamingContext import org.apache.spark.ui.WebUITab @@ -32,11 +30,7 @@ private[spark] class StreamingTab(ssc: StreamingContext) val basePath = parent.basePath val listener = new StreamingJobProgressListener(ssc) - initialize() - - def initialize() { - ssc.addStreamingListener(listener) - attachPage(new StreamingPage(this)) - parent.attachTab(this) - } + ssc.addStreamingListener(listener) + attachPage(new StreamingPage(this)) + parent.attachTab(this) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 502896d76c494..35538ec188f67 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -17,86 +17,30 @@ package org.apache.spark.streaming -import scala.reflect.ClassTag -import scala.util.Random import scala.io.Source -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.dstream.InputDStream - -class UISuite extends FunSuite with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfter { - var sc: SparkContext = null - var ssc: StreamingContext = null - - override def beforeAll() { - val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) - conf.set("spark.cleaner.ttl", "1800") - sc = new SparkContext(conf) - } - - override def afterAll() { - if (sc != null) sc.stop() - } - - before { - ssc = new StreamingContext(sc, Seconds(1)) - } - - after { - if (ssc != null) { - ssc.stop() - ssc = null - } - } +class UISuite extends FunSuite { test("streaming tab in spark UI") { - val ssc = new StreamingContext(sc, Seconds(1)) + val ssc = new StreamingContext("local", "test", Seconds(1)) eventually(timeout(10 seconds), interval(50 milliseconds)) { - val uiData = Source.fromURL( - ssc.sparkContext.ui.appUIAddress).mkString - assert(!uiData.contains("random data that should not be present")) - assert(uiData.contains("streaming")) + val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + // test if streaming tab exist + assert(html.toLowerCase.contains("streaming")) + // test if other Spark tabs still exist + assert(html.toLowerCase.contains("stages")) } - } - ignore("Testing") { - runStreaming(1000000) - } - - def runStreaming(duration: Long) { - val ssc1 = new StreamingContext(sc, Seconds(1)) - val servers1 = (1 to 3).map { i => new TestServer(10000 + i) } - - val inputStream1 = ssc1.union(servers1.map(server => ssc1.socketTextStream("localhost", server.port))) - inputStream1.count.print - - ssc1.start() - servers1.foreach(_.start()) - - val startTime = System.currentTimeMillis() - while (System.currentTimeMillis() - startTime < duration) { - servers1.map(_.send(Random.nextString(10) + "\n")) - //Thread.sleep(1) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL( + ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + assert(html.toLowerCase.contains("batch")) + assert(html.toLowerCase.contains("network")) } - ssc1.stop() - servers1.foreach(_.stop()) } } - -class FunctionBasedInputDStream[T: ClassTag]( - ssc_ : StreamingContext, - function: (StreamingContext, Time) => Option[RDD[T]] - ) extends InputDStream[T](ssc_) { - - def start(): Unit = {} - - def stop(): Unit = {} - - def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime) -} \ No newline at end of file