-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Ensures BaseSubscriber.makeRequest is called on context in PollableSubscriber #7212
fix: Ensures BaseSubscriber.makeRequest is called on context in PollableSubscriber #7212
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AlanConfluent - I'm always impressed with how fast you can get to the root of an issue!
It LGTM (well, as much as I understand this code - which is not much at all, the Vert.X context and threading model always confuses me...), but I think we should add some test coverage to make sure #7115 doesn't come back.
@@ -112,7 +112,7 @@ synchronized boolean isClosed() { | |||
private void checkRequestTokens() { | |||
if (tokens == 0) { | |||
tokens += REQUEST_BATCH_SIZE; | |||
makeRequest(REQUEST_BATCH_SIZE); | |||
runOnRightContext(() -> makeRequest(REQUEST_BATCH_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fascinating. so without this we always hit the batch limit and then stopped? can we add some tests to make sure we can exceed the BATCH_SIZE
- this bug is a little embarrassing! 😳
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added unit tests for this case and verified it fails without my change.
@@ -147,7 +147,7 @@ protected final void checkContext() { | |||
VertxUtils.checkContext(context); | |||
} | |||
|
|||
private void runOnRightContext(final Runnable runnable) { | |||
protected void runOnRightContext(final Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused, the first line in makeRequest
checks the context. if that doesn't ensure our property, then what does it do?
is there any reason not to just always call makeRequest
wrapped in a runOnRightContext
? If that's the case could we keep this private and just make makeRequest
do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused, the first line in makeRequest checks the context. if that doesn't ensure our property, then what does it do?
The call to checkContext
just asserts that we're on the context and is the basis of the exceptions we've seen because we're not in fact on it in some call paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @AlanConfluent thanks for tracking down this bug! I'm +1 to Almog's comment on adding testing to prevent future regressions, and also left a comment inline regarding the fix.
@@ -147,7 +147,7 @@ protected final void checkContext() { | |||
VertxUtils.checkContext(context); | |||
} | |||
|
|||
private void runOnRightContext(final Runnable runnable) { | |||
protected void runOnRightContext(final Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should expose this method. Let's just update the problematic line in PollableSubscriber#poll()
to call context.runOnContext(v -> checkRequestTokens());
instead of checkRequestTokens()
.
The purpose of having strict context checks is to ensure that implementers of BaseSubscriber are cognizant of which methods are being run where. When additional, user-facing methods outside the public methods of BaseSubscriber are exposed, it's preferable to have the implementer intentionally use the correct context, rather than throwing a blanket of "use this context if needed" at the method. If you're curious, there's precedent here:
ksql/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsSubscriber.java
Line 232 in ed2a186
context.runOnContext(v -> handleResult(result)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should expose this method. Let's just update the problematic line in PollableSubscriber#poll() to call context.runOnContext(v -> checkRequestTokens()); instead of checkRequestTokens().
Ok, sounds reasonable. Rather than use synchonization with token
and use an AtomicInteger, which you'd have to do if it was updated in the context, I just changed this within checkRequestTokens
to:
context.runOnContext(v -> makeRequest(REQUEST_BATCH_SIZE));
This fits the invariant you describe about makeRequest being called on context as it is everywhere else. Or do all non-public methods generally get called on the context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do all non-public methods generally get called on the context?
Are you asking whether the entire poll()
call should be run on the context? If so, the answer is no. Blocking operations are not meant to be performed on event loops.
Rather than use synchonization with token and use an AtomicInteger, which you'd have to do if it was updated in the context, I just changed this within checkRequestTokens to:
context.runOnContext(v -> makeRequest(REQUEST_BATCH_SIZE));
My only potential concern with this is that because checkRequestTokens()
is called from afterSubscribe()
which is already wrapped in a call to context.runOnContext(...);
, we now have two layers of wrapping in this case. I think this is fine but it'd be good to double-check. (I hope we have automated testing that would fail if this was disallowed/blocking!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking whether the entire poll() call should be run on the context? If so, the answer is no. Blocking operations are not meant to be performed on event loops.
I definitely understand that that's not allowed on the context. Poll is a public method, so I wasn't meaning to include that. I was more generally asking about the convention about when to ensure code is running on the context vs potentially some other thread. "Do the onramps to the context only happen at a few well defined public API functions in these publishers and subscribers?" was what I meant.
My only potential concern with this is that because checkRequestTokens() is called from afterSubscribe() which is already wrapped in a call to context.runOnContext(...);, we now have two layers of wrapping in this case. I think this is fine but it'd be good to double-check. (I hope we have automated testing that would fail if this was disallowed/blocking!)
Ah, you're right. Didn't realize that about afterSubscribe
. Ok, tokens was already being accessed by multiple threads, so I just made it an AtomicInteger and took your original suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was more generally asking about the convention about when to ensure code is running on the context vs potentially some other thread. "Do the onramps to the context only happen at a few well defined public API functions in these publishers and subscribers?" was what I meant.
In general or in how the ksql code in particular is setup? I don't know the general answer but the ksql setup makes sense to me: BaseSubscriber ensures all the subscriber methods are run on the correct context, since otherwise all subscriber implementations would have to do that separately. For all other methods (not included in BaseSubscriber), the subscriber implementation must ensure the relevant calls are run on the correct context, while minimizing the code that is run on the context to what must be run on the context, in order to not consume event loop resources unnecessarily.
Not sure if I actually answered your question. LMK if I'm still misunderstanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that answers my question. I was kind of wanting to know if in our codebase we had some kind of philosophy since reasoning at a glance about whether some method in one of these classes is run on or off the context is hard. In this case, it seems like BaseSubscriber
handles most of that, which makes it easier. Looking through the code, PollableSubscriber
seems like a bit of an exception since it provides another public method that we know is run off the context, whereas all of these other uses are all async by nature.
Added some tests that trigger the bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) LGTM!
publisher.subscribe(pollableSubscriber); | ||
|
||
Row row = pollableSubscriber.poll(POLL_DURATION); | ||
for (int i = 0; row != null; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not actually verify we received the expected number of rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to check the index at the end. Pulled it out so I can check it against numRows after the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AlanConfluent ! LGTM.
…bleSubscriber (#7212) * fix: Ensures BaseSubscriber.makeRequest is called on context
Description
Fixes a small bug where
BaseSubscriber.makeRequest
can be called from outside the Vertx context, which should be disallowed.makeRequest
is called fromcheckRequestTokens
and there are currently a few paths intocheckRequestTokens
:afterSubscribe
which is called from the contextpoll
which is inherently blocking and shouldn't be called from the context.For this reason, this PR exposes
runOnRightContext
and invokesmakeRequest
using that.Fixes #7115
Testing done
Ran tests and manually exercised this path by fetching more than the batch size.
Reviewer checklist