Skip to content

Commit

Permalink
[SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slo…
Browse files Browse the repository at this point in the history
…w receiver has been initialized, but with hard timeout

### What changes were proposed in this pull request?

This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases.

### Why are the changes needed?

We got some reports for test failure on this test. Please refer [SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663)

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified UT. I've artificially made delay on handling job submission via adding below code in `DAGScheduler.submitJob`:

```
if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) {
  println(s"Receiver Job! rdd name: ${rdd.name}")
  Thread.sleep(1000)
}
```

and the test "stop slow receiver gracefully" failed on current master and passed on the patch.

Closes #25725 from HeartSaVioR/SPARK-24663.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
HeartSaVioR authored and Marcelo Vanzin committed Sep 11, 2019
1 parent b62ef8f commit c18f849
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ class StreamingContextSuite
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
// Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
Expand All @@ -358,6 +357,9 @@ class StreamingContextSuite
}
ssc.start()
ssc.awaitTerminationOrTimeout(500)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(SlowTestReceiver.initialized)
}
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
assert(runningCount > 0)
Expand Down Expand Up @@ -949,6 +951,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

var receivingThreadOption: Option[Thread] = None
@volatile var receivedAllRecords = false

def onStart() {
val thread = new Thread() {
Expand All @@ -958,25 +961,26 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
Thread.sleep(1000 / recordsPerSecond)
store(i)
}
SlowTestReceiver.receivedAllRecords = true
receivedAllRecords = true
logInfo(s"Received all $totalRecords records")
}
}
receivingThreadOption = Some(thread)
thread.start()
SlowTestReceiver.initialized = true
}

def onStop() {
// Simulate slow receiver by waiting for all records to be produced
while (!SlowTestReceiver.receivedAllRecords) {
while (!receivedAllRecords) {
Thread.sleep(100)
}
// no clean to be done, the receiving thread should stop on it own
}
}

object SlowTestReceiver {
var receivedAllRecords = false
var initialized = false
}

/** Streaming application for testing DStream and RDD creation sites */
Expand Down

0 comments on commit c18f849

Please sign in to comment.