-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add compression, pickle protocol to comm contexts #4019
Conversation
Currently we expect a great degree of uniformity among servers in a Dask cluster. This is especially apparent with pickle protocol and compression formats. This PR includes a handshake when connecting that exchanges a bit of information. Then this information is passed to serialization functions through the existing context= mechanism. This allows serialization functions to have more information about the other side of the connection, and to make choices accordingly. This isn't yet smooth, but I thought I'd throw it up early so that people can see what I'm thinking and comment early.
@@ -33,15 +33,15 @@ def _always_use_pickle_for(x): | |||
return False | |||
|
|||
|
|||
def dumps(x, *, buffer_callback=None): | |||
def dumps(x, *, buffer_callback=None, protocol=HIGHEST_PROTOCOL): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this suppose to be None
? Asking as below we have protocol or HIGHEST_PROTOCOL
, which seems unneeded if this is HIGHEST_PROTOCOL
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either one is fine with me. We sometimes pass in protocol=None
, hence the or HIGHEST_PROTOCOL
bit, but I figured having the default be informative wouldn't hurt in either case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah no strong feelings from me either. Just noting it's a little odd to have HIGHEST_PROTOCOL or HIGHEST_PROTOCOL
potentially ;)
Thanks Matt! 😄 Had a couple minor questions above. |
Thanks for the careful review @jakirkham . In general does this approach seem ok to you? I'm looking for high level thoughts before I jump in too deeply (there are still some details to iron out before things work smoothly). |
@jcrist the handshake on connect mechanism here may interest you |
Yeah I think so. IIUC this is just the implementation of what was discussed in issue ( #4011 ). So yeah it seems good to me. Haven't thought too deeply about how the compression side would look. So am trusting you know what you want there ;) Happy to look at that in more detail when that makes sense :) |
Our current policy here isn't great. It's "if both sides agree on the same default compression then compress, otherwise don't". In the future we an pass around a set of allowable compressions and then choose the jointly maximal pair. I figured I'd wait for later to deal with those details though. |
I've just pushed up a commit which moves this from the TCP comm to Comms generally. Everything seems fine on TLS/Inproc but if someone from RAPIDS can check out UCX I would appreciate that. |
This passed before by accident In general we can't ensure that a comm will hear back about its status after is sends its message along.
cc-ing @madsbk (in case you have thoughts here 🙂) |
# assert "unknown ca" in str(excinfo.value) | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a regression. Our error reporting around bad TLS degraded a bit. It now shows up as a timeout error with a more generic "could not connect" message. I tried figuring out what was going on here for a while but couldn't come up with a resolution. I could use help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed as issue ( #4084 ) for tracking and broader visibility.
My guess is this is fine. I'm not sure how much usage we will get out of this in UCX (we ensure We probably need to update One thing that might be interesting in the UCX context (and this can be done in another PR) is whether we want to enable/disable |
Yeah, more generally I encourage RAPIDS folks to think of this PR as two separate features, a specific and a general one Specific: It's now easier to handle mismatched workers/schedulers with regards to compression/Python versions (maybe not that interesting to you) General: Comms now have a mechanism to know arbitrary information about both ends of the connection. If there are questions that you want to know like "which GPU are you?" or "what architecture are you running?" then there is infrastructure here to collect and transfer that information as well as deliver it to any serialization function. |
On this general question, I think there is another point that will come up (though again this would be future work), is can we choose where tasks goes based on this information? For example knowing a worker is "closer" and free might be a useful thing when distributing tasks and moving data, which would be helpful for RAPIDS, but could have broader value. Though there could be other cases like directing tasks based on the capabilities of the worker referenced above. |
I don't really see how moving all frames to host would benefit UCX. Apart from that, today we only use UCP layer, meaning all routing is done internally by UCX without any capability to expose that to UCX-Py, and consequently to the outside world, such as Dask. That is in itself a limiting factor for doing that. |
No problem. If it's something I can contribute to, let me know. |
OK, I've had to relax one test (which is pretty pedantic anyway) and I've resolved all comments. I'm going to go ahead and merge after passed tests. Thank you everyone for the careful review. |
Thanks Matt! 😄 |
As a result of this, we need to fix the UCX tests as well. Submitted PR ( #4036 ) to do just that. |
Currently we expect a great degree of uniformity among servers in a Dask
cluster. This is especially apparent with pickle protocol and
compression formats. This PR includes a handshake when connecting that
exchanges a bit of information. Then this information is passed to
serialization functions through the existing context= mechanism.
This allows serialization functions to have more information about the
other side of the connection, and to make choices accordingly.
This isn't yet smooth, but I thought I'd throw it up early so that
people can see what I'm thinking and comment early.
cc @quasiben @jakirkham @jrbourbeau