-
Notifications
You must be signed in to change notification settings - Fork 27
Enhance Backpressure Support in Mesos Response Stream #81
Conversation
I have the same issue. The cause issue is in PoolExhaustedException. My thought is what you get MissingBackpressureException because of MaxConnectionsBasedStrategy. Which allows 1000 connections. So I guess Event acknowledgement try to send more then 1000 response immediately. |
Thanks, I'll try and get a review done this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking the time to figure out the correct location to apply this and writing the tests to prove it out.
Overall looks good, a few small requests and some style/nit comments.
@@ -42,6 +44,7 @@ | |||
private MessageCodec<Receive> receiveCodec; | |||
private Send subscribe; | |||
private Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor; | |||
private Observable.Transformer<byte[], byte[]> byteStreamTransformer = observable -> observable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move the setting of the default value of this field into the constructor instead of here directly on the field.
* | ||
* @return this builder (allowing for further chained calls) | ||
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a> | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great javadocs here explaining the new methods.
@NotNull | ||
public MesosClientBuilder<Send, Receive> onBackpressureBuffer( | ||
|
||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: remove the blank line and pull the close paren and curly brace up to line 180.
private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientIntegrationTest.class); | ||
|
||
@Rule | ||
public Async async = new Async(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This async
field doesn't seem to be used and can be removed.
@@ -145,6 +155,73 @@ public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws | |||
} | |||
} | |||
|
|||
@Test | |||
public void testBurstyObservable_missingBackpressureException() throws Throwable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please move these two new tests and the increased timeout duration into their own class so that they are easy to bulk @Ignore
if we need to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do.
public void testBurstyObservable_missingBackpressureException() throws Throwable { | ||
String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; | ||
|
||
String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: mark variable as final
String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; | ||
|
||
String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; | ||
byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: mark variable as final
|
||
String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; | ||
byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8); | ||
byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: mark variable as final
|
||
@Test | ||
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable { | ||
String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: mark variable as final
@Test | ||
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable { | ||
String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; | ||
String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: mark variable as final
As a comment, I started my machine running the new |
Thank you for the detailed review! I think all your feedback makes sense. I will comment on the couple that I have questions about. Sorry about all the styling issues. I was rushing a bit to get this working for my scheduler and probably should've taken an extra pass through! Regarding the tests, I think to be more specific, the one that gives me fits sometimes is I basically find that sometimes if you have enough CPU power, the stream can keep up, and that test will fail because it expects a One final quick question: once I get the changes made, and you're happy with the pull request, do you anticipate doing a release in the near future? |
Sounds good, happy to answer questions. No worries about the styling issues, thanks for being willing to clean that up before merge. Ah, interesting (sounds a bit like jvm warmup kicking in an optimizing something), let's mark that test as Regarding a new release, yeah we should be able to cut a 0.2.0 pretty soon after this is merged. I'll go ahead and create the milestone in GitHub and assign this PR and the corresponding issue to it. Thanks again for contributing this! |
I think that I addressed all of your feedback. When you get a chance, check it out and let me know if it looks good. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor thing, all of the style changes look good as well as the new test class.
After you fix the one point, can you squash all the commits to a single commit and then I can merge that.
Thanks!
* <li>{@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while | ||
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow} | ||
* to signal the overflow to the producer.</li> | ||
* <li>{@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small change here. I apologize for giving you the incorrect value the first time. I thought BackpressureOverflow.Strategy
was an enum, didn't realize it was an interface.
The instances we're wanting to point at are constants in the BackpressureOverflow
class itself. So, we need to change these from BackpressureOverflow.Strategy#...
to just BackpressureOverflow#...
.
…ws the use of buffered backpressure operators BEFORE observeOn is called. Also included is a new MesosServerSimulation call to do awaitCalls with timeout.
64edc46
to
7e81fea
Compare
Good to go. Let me know if you need anything else. Thanks! |
Thanks again for contributing this! I'll try and get a release of 0.2.0 out in the next week or so. After I merge this a snapshot will be published that you can use for some testing if you want to before the release. |
Sounds good - thanks for working with me on this. Glad I could contribute! |
0.2.0 is now available in maven central. Thanks again for the contribution. |
Backpressure Enhancement
#13
Overview
Add a transformer (via compose) to the reactive HTTP stream that allows the use of buffered backpressure operators BEFORE observeOn is called.
Also included is a new MesosServerSimulation call to do awaitCalls with timeout.
Background
I have a custom scheduler that runs hundreds of tasks at once. When I issue a reconcile call at peak utilization, I can pretty reliably reproduce a
MissingBackpressureException
I tried to add the
.onBackpressureBuffer
operators to the callback that is already exposed (processStream
), but it was too late. TheonBackpressureBuffer
operator needs placed before thebuffer
call inMesosClient
After this change, I was very happy with the way it worked in my custom scheduler, but I do understand that this is a general-purpose library for all to use, so I'm willing to take your feedback or make any changes you need to get it merged.
Big Disclaimer
I don't trust the stability of the 2 tests I added. You might consider disabling them for an automated build. They depend on the ability to overwhelm the stream composition and force backpressure. It usually works!