-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
O(1) rebalance #4774
O(1) rebalance #4774
Conversation
rebalance() runtime when there is nothing to do:
|
ImportantI noticed that, once the size of a single minimal Python object (read: the buffer of a numpy array) drops below 2 MiB, CPython / Linux x64 won't release RAM instantly anymore when the object is deallocated, but instead it will hog it indefinitely. gc.collect() does nothing. malloc_trim() typically, but not always, fixes the problem. Creating new Python objects simply reuses said memory. This is the behaviour already in master - nothing changes there - and it already impacts all logic that relies on measuring process memory (spilling, pausing, and restarting). What does change with this PR is that now this unused but allocated memory is considered by rebalance() too, which in turn means that keys may be evicted too aggressively from particularly heavy nodes, as there is currently no way to tell apart trimmable memory from memory leaks or genuine fragmentation. The current workaround is that I let the user pick what measure he wants to use in rebalance(); by default it is the optimistic memory (managed by dask in RAM + unmanaged older than 30s) but it can be changed to managed only, thus reverting to the behaviour of master. Needless to say this is not great as it adds config burden on the shoulders of the user. An alternative, cleaner solution would be to run malloc_trim() periodically (e.g. 2~5s) on the workers, but problems will ensue if someone compiled the Python interpreter with an alternative alloc/free pair of primitives. I have not tested the behaviour on MacOSX and Windows yet, but from what I saw in my previous PR I expect similar headaches there too. |
Can I ask you to raise an issue about this and tag |
cc @xhochy |
Thanks for the ping, I'll probably should comment on the (to-be-opened) issue instead of here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the simplicity and elegance of the algorithm. In particular that there are no opaque heuristics in there. Other than configuring the different thresholds, I think we have a nice lever to control different policies via the heap sort key (or rather the who_has sorting / insertion order / etc.)
I am a bit concerned about the dynamic case where this decision might become much more complicated. I would like to avoid the complexity we currently have in dask.order
.
Also, I'm wondering how we can manage "data ownership", i.e. is a worker actually supposed to hold a replica or is it just a temp copy needed for a dependency, etc. I believe we do not have a data model for this, yet.
These concerns should not stop us here but rather should be kept in mind once we move on to the next step(s).
distributed/scheduler.py
Outdated
memory_by_worker = [ | ||
(ws, getattr(ws.memory, MEMORY_REBALANCE_MEASURE)) for ws in workers | ||
] | ||
mean_memory = sum(m for _, m in memory_by_worker) // len(memory_by_worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If performance is super critical, this could be done in one loop. This way we iterate three times over the workers. For sake of simplicity, it's probably fine to keep as is for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to store the memory measure somewhere for later (calling WorkerState.memory is mildly expensive). List comprehension tends to be a lot faster than explicit for loop + appends.
# If this is your problem on Linux, you should alternatively consider | ||
# setting the MALLOC_TRIM_THRESHOLD_ environment variable (note the final | ||
# underscore) to a low value; refer to the mallopt man page and to the | ||
# comments about M_TRIM_THRESHOLD on | ||
# https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO as part of a later PR, I'm going to move this advice to a sphinx page for the sake of visibility and just leave a note here to go read it
All tests successful over 4 consecutive runs |
I left many comments but overall the code itself looks great. I also do like the clearly written tests since the expected behaviour is quite clear. The two biggest issues I consider worth discussing are
|
@@ -161,14 +163,6 @@ def nogil(func): | |||
DEFAULT_DATA_SIZE = declare( | |||
Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size")) | |||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could not move this as it is required by TaskState, and IMHO it would be too expensive to reload it on every TaskState.__init__
I redesigned this part. Now dask.config is read on
While it is certainly possible to move _rebalance_find_msgs to a top-level function, it would require in input a SchedulerState object and a list of WorkerState objects, neither of which are easy - and most importantly, robust - to craft synthetically in a unit test. I'd much rather keep the tests as a black-box design because otherwise it's extremely easy that the real-life Scheduler and WorkerState objects will deviate from those crafted for the tests. |
We wouldn't need to pass the entire scheduler state in there, would we? I agree this would be nuts. From what I can see is that we're using four constants and the log_event method of the Passing in a list of def test_rebalance_simple():
A = WorkerState(name="A", memory_limit="1GB")
B = WorkerState(name="B", memory_limit="1GB")
def put_task_on_worker(ws, key, nbytes):
ws._has_what[key] = None
ws._nbytes += nbytes
for ix in range(4):
put_task_on_worker(A, f"k{ix}", 200)
result = _rebalance_find_msgs(workers=[A, B])
expected = [
(A, B, "k0"),
(A, B, "k1"),
]
assert result == expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooops, had a few comments in 'pending', sorry
"""keys exist but belong to unfinished futures""" | ||
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) | ||
await asyncio.sleep(0.1) | ||
out = await s.rebalance(keys=[f.key for f in futures]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a nother test for the internal mechanics which is waiting for the future. Why is this different for the user API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests in test_client.py test specifically the wrapper in client.py. Previously, all tests for rebalance were exclusively in test_client.py. However, very soon rebalance will be chiefly invoked internally by the scheduler, bypassing the client entirely. Hence, the tests should invoke Scheduler.rebalance and not rely on the client wrapper.
As to why Client.rebalance and Scheduler.rebalance behave differently: it would be non-trivial to move the logic upstream (Client uses Futures, Scheduler uses keys, for which I couldn't find a straightforward wait method?) and I just could not justify the effort. If in the future the Scheduler will call rebalance internally on unfinished keys, we will revisit this, but I just cannot see a use case in the plan that we laid down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added clarification in the test that Client.rebalance and Scheduler.rebalance behave differently for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. The scheduler indeed does not have a way to wait for keys. The closes thing to this would be if we were to allow registration of "transition callbacks", such that if a key is transitioned into a given state, a callback is invoked. However, let's just not do that, the state machine is too complex as it is.
@set_config_and_reload({"distributed.worker.memory.rebalance.measure": "managed"}) | ||
@gen_cluster(client=True, worker_kwargs={"memory_limit": 0}) | ||
async def test_rebalance_skip_all_recipients(c, s, a, b): | ||
"""All recipients are skipped because they already hold copies""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More a warning than an actual comment about this test. As soon as we hit a dynamic scheduler, replicas become much, much more complex since whether or not a task is actually replicated cannot be inferred from who_has. While who_has tracks copies of the data, this includes "temporary" copies on workers. the lifecycle of a tasks data is different for data which was calculated on a worker and data which is only there because it is a dependency of another task. Simply "skipping" these tasks might no longer be and the definition of the state (i.e. simply count data) will become more complex.
|
||
# Fast exit in case no transfers are necessary or possible | ||
if not senders or not recipients: | ||
self.log_event( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually the only place in this method where we're using SchedulerState
/ self
. Why not factor this out into an independent function? This would make testing possible without actually invoking any scheduler/worker interaction and would allow us to write tests by defining measuremets (memory per worker) and assert the result w/out playing with actual memory measurements, scheduler<->worker heartbeats, comms, etc.
This would also futher decouple the move of the data from the decision making and might allow for easier testing of the latter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactoring the function is straightforward, but I don't agree on doing it to begin with. Refer to discussion above
#4853 breaks this. Temporarily reverting the other PR to prove that tests pass. |
I strongly disagree. WorkerState objects are complex and in your code snippet you made very strong assumptions on their internal behaviour. If these internals changed in the future, I don't want a (potentially newbie) developer to have to hunt down unit tests that mock internal behaviour. What you're proposing has the big potential that, in a future PR completely unrelated to rebalance, rebalance will break without any of the unit tests noticing. |
Just to jump in here with historical context. Historically distributed had a lot of fine grained unit tests like this. I've heard them called "white box" tests because they expose the internals, unlike "black box" tests. In hindsight this ended up being a bad idea, especially as the scheduler changed internally. Small changes to Scheduler internals required changing dozens of tests. The tests became a liability and added inertia to development. At one point I spent a week rewriting white box tests so that we generally only touched user-level API. This was hard because we needed to craft very specific situations where the desired behavior would arise. In hindsight though this approach has provided a ton of value. We're able to make large changes to internal scheduling state and the test suite remains valid. |
As you said, this is a snippet I haven't put much thought into. The only relevant bit of this snippet is that the actual modifications are encapsulated into a dedicated function such that there is one place to change things. I didn't want to argue about the very specific way to do this until we reached a conclusion about whether this is something we want to have or not. Please do not reject this idea based on that snippet.
This assumes that there are no tests which cover the entire system and this is not something I propose. A bunch of the tests you wrote are still very important, exactly the way you wrote them, to know that everything works when put together. The how do we get the system into the proposed state is a hard problem by itself but less of mathematical nature. This problem is something where we need to deal with all the problems distributed systems introduce, like latencies, comm failures, dead nodes, race conditions, etc. Also a problem space where I think we should have enough edge case tests for non-happy paths. Then there is the "plugging both pieces together" problem which doesn't require huge amounts of tests if both of the above subclasses are well tested.
I get that. That's the classical test hierarchy problem and both from my experience and from literature I believe that a healthy balance is good. The typical approach to this problem is to work with appropriate internal abstractions and (private) interfaces. If existing abstraction are not useful, let's create new ones or change existing ones. If the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another look over the code and everything seems in order. Great job.
Regarding the refactoring, we couldn't reach a conclusion yet but I'm fine with merging as is. If we decide in favour of a refactoring, this will be additional work on top of this anyhow.
Woo!
…On Tue, Jun 1, 2021 at 9:18 AM Florian Jetter ***@***.***> wrote:
Merged #4774 <#4774> into main.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#4774 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTB7RUCBW7YT22XCDA3TQTT25ANCNFSM4334HZQQ>
.
|
In scope
Out of scope, left for future PRs
Demo
https://gist.github.com/crusaderky/c1ccf5fd0107b13c8d24bbed5197d5f6
(also works on master)