diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ea4c1d4e21d67..6cce280250eec 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -346,7 +346,6 @@ class StreamingContextSuite logInfo("==================================\n\n\n") ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 - SlowTestReceiver.receivedAllRecords = false // Create test receiver that sleeps in onStop() val totalNumRecords = 15 val recordsPerSecond = 1 @@ -358,6 +357,9 @@ class StreamingContextSuite } ssc.start() ssc.awaitTerminationOrTimeout(500) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(SlowTestReceiver.initialized) + } ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) assert(runningCount > 0) @@ -949,6 +951,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { var receivingThreadOption: Option[Thread] = None + @volatile var receivedAllRecords = false def onStart() { val thread = new Thread() { @@ -958,17 +961,18 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) Thread.sleep(1000 / recordsPerSecond) store(i) } - SlowTestReceiver.receivedAllRecords = true + receivedAllRecords = true logInfo(s"Received all $totalRecords records") } } receivingThreadOption = Some(thread) thread.start() + SlowTestReceiver.initialized = true } def onStop() { // Simulate slow receiver by waiting for all records to be produced - while (!SlowTestReceiver.receivedAllRecords) { + while (!receivedAllRecords) { Thread.sleep(100) } // no clean to be done, the receiving thread should stop on it own @@ -976,7 +980,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) } object SlowTestReceiver { - var receivedAllRecords = false + var initialized = false } /** Streaming application for testing DStream and RDD creation sites */