-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encapsulate spill buffer and memory_monitor #5904
Conversation
40c584f
to
8326ac0
Compare
@@ -97,7 +99,6 @@ def __init__( | |||
services=None, | |||
name=None, | |||
memory_limit="auto", | |||
memory_terminate_fraction: float | Literal[False] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This init parameter is a very recent addition so I think it's safe not to have a deprecation cycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it doesn't look like there have been any releases since it was introduced:
$ git log a86f4bb568b5aeb60f5a2a8e24f86592a407b09d~1..HEAD --oneline
4918d652 Merge monitor-interval for spill/pause and terminate
47b4ea19 Merge branch 'main' into spill_extension
7a69b5e2 Prevent data duplication on unspill (#5936)
1a0548b6 distributed.worker_memory
2fffe74d Worker State Machine refactor: redesign TaskState and scheduler messages (#5922)
85bf1beb absolufy-imports (#5924)
925c6100 Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)
60ce8436 Fix `track_features` for distributed pre-releases (#5927)
2d68dfc8 Add key to compute failed message (#5928)
f9d2f914 zict type annotations (#5905)
30f0b601 Support dumping cluster state to URL (#5863)
936fba5a Xfail test_submit_different_names (#5916)
e1e43858 Change default log format to include timestamp (#5897)
de94b408 Unblock event loop while waiting for ThreadpoolExecutor to shut down (#5883)
39c5e885 handle concurrent or failing handshakes in InProcListener (#5903)
b3f50cef add GitHub URL for PyPi (#5886)
8c98ad8c fix progress_stream teardown (#5823)
be4fc7f7 Drop unused `_round_robin` global variable (#5881)
ca235dd6 Mark xfail COMPILED tests skipif instead (#5884)
16931cc8 Improve type annotations in worker.py (#5814)
a86f4bb5 Mock process memory readings in test_worker.py (v2) (#5878)
0d92476
to
6022a78
Compare
memory_target_fraction: float | Literal[False] | None = None, | ||
memory_spill_fraction: float | Literal[False] | None = None, | ||
memory_pause_fraction: float | Literal[False] | None = None, | ||
max_spill: float | str | Literal[False] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_spill is a very recent addition so I didn't put it through a deprecation cycle
same for memory_monitor_interval as a init parameter
Callable[..., MutableMapping[str, Any]], dict[str, Any] | ||
] # (constructor, kwargs to constructor) | ||
| None # create internally | ||
) = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to me like a feature for power users that is as powerful as obscure, and I would love to hear from somebody who actually uses it to understand their use case!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used by Dask-CUDA. See #5909
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if ManualEvictProto
could be used in this typed declaration, but I have no strong opinion, as I'm not sure whether it would even be possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because the type declaration says what the parameter must be; ManualEvictProto is an additional, optional interface to the MutableMapping which unlocks extra features.
5d18a87
to
60a5dc2
Compare
"distributed.worker.memory.pause": False, | ||
}, | ||
) | ||
async def test_pause_executor_manual(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
"distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
}, | ||
) | ||
async def test_pause_executor_with_memory_monitor(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was test_worker.py::test_pause_executor
"distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
}, | ||
) | ||
async def test_override_data_does_not_spill(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
], | ||
) | ||
@gen_cluster(nthreads=[]) | ||
async def test_deprecated_attributes(s, cls, name, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
["memory_target_fraction", "memory_spill_fraction", "memory_pause_fraction"], | ||
) | ||
@gen_cluster(nthreads=[]) | ||
async def test_deprecated_params(s, name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
@dask/gpu somebody should have a look if this affects anything in RAPIDS |
Outstanding:
This is otherwise complete. |
Thanks for pinging us here. I was already looking at the PR earlier today but it's fairly long so I couldn't finish yet, plus @crusaderky informed us that #5909 is being fixed here too. I ran Dask-CUDA tests with it and due to the obvious API changes there are more tests failing, could we hold on to merging it for another day or two until we can ensure that doesn't break compatibility in a way that can't be fixed in Dask-CUDA? If not that's ok, but we may need to revisit changes should anything arise. |
@pentschev take whatever time you need. Note that I just pushed an additional commit which re-adds the Worker.memory_monitor method. Downstream of it, there shouldn't be any obvious API changes - could you point them out please? |
Unit Test Results 12 files ± 0 12 suites ±0 5h 54m 17s ⏱️ + 7m 47s For more details on these failures, see this check. Results for commit 5883c96. ± Comparison against base commit 9f7027e. ♻️ This comment has been updated with latest results. |
distributed/worker_memory.py
Outdated
if not isinstance(self.data, SpillBuffer): | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this check be removed? Dask CUDA doesn't use a SpillBuffer
, as noted here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternately, Dask CUDA should inherit from SpillBuffer
- in which case, perhaps it'd be a good idea to define what interface a SpillBuffer
should implement? Either as part of the docs, or codified as an ABC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shwina to answer the question I would need to understand what Dask CUDA is trying to do and what it was doing before.
Is it supposed to react to the distributed.worker.memory.spill
threshold?
If so, how does it do it? Does it replace the memory_monitor
method, or is it duck-type compatible with SpillBuffer and offer a data.fast.evict()
method? I couldn't find either. If neither is the case, does it mean that, even before #5543 and #5736, Worker.memory_monitor
was crashing?
BTW I noticed now that it's not wrapped with log_errors
, so the crash would be completely invisible. I'm adding it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point us to where the Dask CUDA data mapping is defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dask-CUDA implements two different data mappings to handle GPU->CPU spilling:
- https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/device_host_file.py#L145
- https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/proxify_host_file.py#L433
They implement data.fast.evict()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that ProxifyHostFile has been modified to support both before and after #5543 (2022.02.1).
DeviceHostFile won't work with >=2022.02.1 as it misses an evict() method.
See my latest commit (a8ac78d) where I formalise this interface (missing high level docs and unit tests). Could you confirm that ProxifyHostFile works again now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeviceHostFile won't work with >=2022.02.1 as it misses an evict() method.
Implemented now in shwina/dask-cuda#2
worker_kwargs={"memory_limit": "1 GB", "data": UserDict}, | ||
config={"distributed.worker.memory.monitor-interval.spill-pause": "10ms"}, | ||
) | ||
async def test_override_data_vs_memory_monitor(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
5fd4754
to
d948828
Compare
"distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
}, | ||
) | ||
async def test_manual_evict_proto(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new test
Worker Memory Management | ||
======================== | ||
For cluster-wide memory-management, see :doc:`memory`. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
below this point, this is just a cut-paste from worker.rst
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your kindness to the reviewer! 😆
bc03fbc
to
1a0548b
Compare
@shwina can you rerun your dask-cuda PR with the last from here and confirm the latest changes will work for dask-cuda ? |
I checked this with changes from rapidsai/dask-cuda#870 and all tests pass. Would be great to get this PR in before the next Dask release. |
After offline discussion with @fjetter, I merged the monitor-interval for spill/pause and terminate |
This is now missing exclusively final review and merge |
+1 for merging |
Do we know what is up with the Windows CI failures? |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR encapsulates Nanny and Worker Memory Logic in the Nanny
and Worker
classes into separate NannyMemoryManager
and WorkerMemoryManager
classes.
My comments are mostly questions and statements for my own understanding, although there are one or two nits that the author may wish to address.
try: | ||
return self._status | ||
except AttributeError: | ||
return Status.undefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: Would it not be simpler to define _status
upfront in __init__
?
def __init__(self, ...):
self._status = Status.undefined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's already happening, but the subclasses are not calling super().__init__
straight away and that creates something that's hard to disentangle.
@@ -97,7 +99,6 @@ def __init__( | |||
services=None, | |||
name=None, | |||
memory_limit="auto", | |||
memory_terminate_fraction: float | Literal[False] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it doesn't look like there have been any releases since it was introduced:
$ git log a86f4bb568b5aeb60f5a2a8e24f86592a407b09d~1..HEAD --oneline
4918d652 Merge monitor-interval for spill/pause and terminate
47b4ea19 Merge branch 'main' into spill_extension
7a69b5e2 Prevent data duplication on unspill (#5936)
1a0548b6 distributed.worker_memory
2fffe74d Worker State Machine refactor: redesign TaskState and scheduler messages (#5922)
85bf1beb absolufy-imports (#5924)
925c6100 Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)
60ce8436 Fix `track_features` for distributed pre-releases (#5927)
2d68dfc8 Add key to compute failed message (#5928)
f9d2f914 zict type annotations (#5905)
30f0b601 Support dumping cluster state to URL (#5863)
936fba5a Xfail test_submit_different_names (#5916)
e1e43858 Change default log format to include timestamp (#5897)
de94b408 Unblock event loop while waiting for ThreadpoolExecutor to shut down (#5883)
39c5e885 handle concurrent or failing handshakes in InProcListener (#5903)
b3f50cef add GitHub URL for PyPi (#5886)
8c98ad8c fix progress_stream teardown (#5823)
be4fc7f7 Drop unused `_round_robin` global variable (#5881)
ca235dd6 Mark xfail COMPILED tests skipif instead (#5884)
16931cc8 Improve type annotations in worker.py (#5814)
a86f4bb5 Mock process memory readings in test_worker.py (v2) (#5878)
# Deprecated attributes; use Nanny.memory_manager.<name> instead | ||
memory_limit = DeprecatedMemoryManagerAttribute() | ||
memory_terminate_fraction = DeprecatedMemoryManagerAttribute() | ||
memory_monitor = DeprecatedMemoryMonitor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own comprehension, a descriptor protocol is used to warn and redirect use of the deprecated attribute onto the appropriate attribute inself.memory_manager: NannyMemoryManager
.
""" | ||
... # pragma: nocover | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own comprehension, this Protocol checks that the supplied object adheres to a duck-type interface, as opposed to that imposed by an ABC class.
This is primarily used in worker_memory.WorkerMemoryManager._maybe_spill
to ensure that the data: MutableMapping
member adheres to the appropriate interface.
# It is not sent to the worker. | ||
z = c.submit(inc, 2, key="z") | ||
while "z" not in s.tasks or s.tasks["z"].state != "no-worker": | ||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that these potentially infinite while loops in test cases are safe because there a timeout is enforced on test case runs via @gen_cluster
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
assert w.memory_manager.memory_limit == 2e9 | ||
|
||
|
||
@gen_cluster( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look like they were moved fromtest_worker.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything that I didn't comment with "new test" is moved from test_worker
|
||
futures = await c.scatter({"x": None, "y": None, "z": None}) | ||
while a.data.evicted != {"x", "y", "z"}: | ||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own comprehension, this test mocks the process memory to above the 70% limit, which forces the "x", "y" and "z" keys to be evicted as per the ManualEvictDict
protocol.
Callable[..., MutableMapping[str, Any]], dict[str, Any] | ||
] # (constructor, kwargs to constructor) | ||
| None # create internally | ||
) = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if ManualEvictProto
could be used in this typed declaration, but I have no strong opinion, as I'm not sure whether it would even be possible.
Co-authored-by: Simon Perkins <[email protected]>
Co-authored-by: Simon Perkins <[email protected]>
Is this good to merge? |
it is |
This PR updates dask-cuda to work with the new `WorkerMemoryManager` abstraction being introduced in dask/distributed#5904. Once both PRs are merged, and pending the resolution of https://github.com/dask/distributed/pull/5904/files#r822084806, dask-cuda CI should be unblocked. Authors: - Ashwin Srinath (https://github.com/shwina) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: #870
In the config, memory thresholds such as `distributed.worker.memory.terminate` should never be exactly `0.0`. Instead, config should use `false` to disable memory management. This one bit me recently. My older dask config files used `0.0` to disable the memory management features. That worked because older versions of `distributed` interpreted the value `0.0` to be the equivalent to `false` for these fields. But in newer versions, only `false` works. (I suspect the change occurred in #5904.) Nowadays, if the config says `0.0`, then `distributed` interprets that literally -- and no memory can be used at all without incurring the wrath of the memory manager! An easy "fix" is to disallow `0.0` in the user's config. In json schema, `exclusiveMinimum: 0` ensures that the value `0.0` itself is not permitted by the schema.
SpillBuffer
breaks usage of customWorker(data=...)
types #5909