-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Client] Allow Client{Object,Actor}Ref to accept a future. #18677
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM!
python/ray/includes/object_ref.pxi
Outdated
def _on_completed(self, set_future: Callable[[Any], None]): | ||
"""Register a callback that will be called after Object is ready. | ||
If the ObjectRef is already ready, the callback will be called soon. | ||
The callback should take the result as the only argument. The result | ||
can be an exception object in case of task error. | ||
""" | ||
core_worker = ray.worker.global_worker.core_worker | ||
core_worker.set_get_async_callback(self, py_callback) | ||
core_worker.set_get_async_callback(self, set_future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this as py_callback
? It is commonly used for setting futures, but is used by Serve and client to do more arbitrary callback behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, good point.
@@ -454,7 +465,8 @@ def print_on_stderr_and_stdout(s): | |||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") | |||
def test_serializing_exceptions(ray_start_regular_shared): | |||
with ray_start_client_server() as ray: | |||
with pytest.raises(ValueError): | |||
with pytest.raises( | |||
ValueError, match="Failed to look up actor with name 'abc'"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah! Did not know this was a feature of pytest.raises
:)
@@ -281,7 +282,7 @@ def put(self, vals, *, client_ref_id: bytes = None): | |||
out = out[0] | |||
return out | |||
|
|||
def _put(self, val, *, client_ref_id: bytes = None): | |||
def _put(self, val, client_ref_id: bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for dropping the *
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client_ref_id
arg is converted from optional keyword to become positional and always required. IIUC *
becomes incorrect.
Thanks for taking a look! |
…s and returns synchronously.
@@ -206,7 +205,8 @@ def Datapath(self, request_iterator, context): | |||
# Connection isn't recoverable, skip cleanup | |||
cleanup_requested = True | |||
finally: | |||
logger.debug(f"Lost data connection from client {client_id}") | |||
logger.debug(f"Stream is broken with client {client_id}") | |||
self.basic_service.release_all(client_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the release_all here can be removed (it now happens farther down at what is currently line 241, after the grace period has passed and we're sure the client hasn't reconnected). Guessing this was a remnant of resolving merge conflicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, let me know if you run into any weird problems with test_client_reconnect
that might have popped up from the merge conf
Sounds good. It passed locally. |
Failure in |
Why are these changes needed?
This is in preparation for #18298. No code path actually creates refs with futures yet in this PR.
Another change is to check the number of returns and function signatures of tasks and methods on client side.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.