diff --git a/distributed/client.py b/distributed/client.py index b5771266937..ca8f87ffaea 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -53,6 +53,7 @@ ) from .diagnostics.plugin import UploadFile, WorkerPlugin, _get_worker_plugin_name from .metrics import time +from .objects import HasWhat, WhoHas from .protocol import to_serialize from .protocol.pickle import dumps, loads from .publish import Datasets @@ -3205,7 +3206,11 @@ def who_has(self, futures=None, **kwargs): keys = list(map(stringify, {f.key for f in futures})) else: keys = None - return self.sync(self.scheduler.who_has, keys=keys, **kwargs) + + async def _(): + return WhoHas(await self.scheduler.who_has(keys=keys, **kwargs)) + + return self.sync(_) def has_what(self, workers=None, **kwargs): """Which keys are held by which workers @@ -3239,7 +3244,11 @@ def has_what(self, workers=None, **kwargs): workers = list(workers) if workers is not None and not isinstance(workers, (tuple, list, set)): workers = [workers] - return self.sync(self.scheduler.has_what, workers=workers, **kwargs) + + async def _(): + return HasWhat(await self.scheduler.has_what(workers=workers, **kwargs)) + + return self.sync(_) def processing(self, workers=None): """The tasks currently running on each worker diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 2e3d1ef2aa9..4b86ae05c81 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3656,13 +3656,14 @@ async def test_async_whowhat(c, s, a, b): who_has = await c.who_has() has_what = await c.has_what() + assert type(who_has) is WhoHas + assert type(has_what) is HasWhat assert who_has == {x.key: (a.address,)} assert has_what == {a.address: (x.key,), b.address: ()} -@pytest.mark.xfail(reason="Want to fix to use `WhoHas` + `WhatHas`") -def test_client_repr(c): +def test_client_repr_html(c): x = c.submit(inc, 1) who_has = c.who_has()