-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout #25725
Conversation
…w receiver has been initialized, but with hard timeout
Test build #110329 has finished for PR 25725 at commit
|
@@ -974,6 +977,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) | |||
} | |||
receivingThreadOption = Some(thread) | |||
thread.start() | |||
SlowTestReceiver.initialized = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be in the Thread.run()
implementation above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we consider only timing, technically it can be placed anywhere, even first line of onStart()
, as whether receiver is registered within time or not is the key. For verification, it doesn't make existing test failing without patch even we add Thread.sleep(1000)
in first line of onStart()
.
So no strong opinion on where to put.
Btw, maybe we can apply "more verbose but clearer" solution (without this flag) via adding below code in test side:
// tracks whether the receiver is started or not
var isReceiverStarted = false
val listener = new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
isReceiverStarted = receiverStarted.receiverInfo.name.startsWith(
input.getReceiver().getClass.getSimpleName)
}
}
ssc.addStreamingListener(listener)
ssc.start()
ssc.awaitTerminationOrTimeout(500)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(isReceiverStarted)
}
Which one do you think is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as whether receiver is registered within time or not is the key
Ah ok. Was wondering if the thread that actually does stuff needed to run for this to work, but if it's just the registration that matters, this is enough.
But can't the flag be in the actual SlowTestReceiver
instance (instead of the object)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can't access these flags if we place them in instance - as the receiver will be executed in executor (say, different JVM, or at least, serialized - deserialized even in test). So exposing them in object is really a hack only for testing.
Btw, receivedAllRecords
is actually not needed to be exposed as outside, as we create new receiver instance and we can just set the default value to false. If we move it, we can either remove hack entirely via adding more verbose code or keep the hack for concise code change.
I'll move out receivedAllRecords for now: please let me know if you prefer to remove the hack, then I'll make a change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the receiver will be executed in executor
ok, that makes sense; I was under the impression it was a driver thing since an instance of it was passed to the context, instead of one created somehow in the executor.
Anyway, what you have is fine.
Test build #110448 has finished for PR 25725 at commit
|
Merging to master. |
Thanks all for reviewing and merging! |
Thank you, @HeartSaVioR and @vanzin . |
I'll test on |
…w receiver has been initialized, but with hard timeout ### What changes were proposed in this pull request? This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases. ### Why are the changes needed? We got some reports for test failure on this test. Please refer [SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663) ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modified UT. I've artificially made delay on handling job submission via adding below code in `DAGScheduler.submitJob`: ``` if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) { println(s"Receiver Job! rdd name: ${rdd.name}") Thread.sleep(1000) } ``` and the test "stop slow receiver gracefully" failed on current master and passed on the patch. Closes #25725 from HeartSaVioR/SPARK-24663. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
…w receiver has been initialized, but with hard timeout ### What changes were proposed in this pull request? This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases. ### Why are the changes needed? We got some reports for test failure on this test. Please refer [SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663) ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modified UT. I've artificially made delay on handling job submission via adding below code in `DAGScheduler.submitJob`: ``` if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) { println(s"Receiver Job! rdd name: ${rdd.name}") Thread.sleep(1000) } ``` and the test "stop slow receiver gracefully" failed on current master and passed on the patch. Closes apache#25725 from HeartSaVioR/SPARK-24663. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
What changes were proposed in this pull request?
This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases.
Why are the changes needed?
We got some reports for test failure on this test. Please refer SPARK-24663
Does this PR introduce any user-facing change?
No
How was this patch tested?
Modified UT. I've artificially made delay on handling job submission via adding below code in
DAGScheduler.submitJob
:and the test "stop slow receiver gracefully" failed on current master and passed on the patch.