-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ease memory pressure by deprioritizing root tasks? #6360
Comments
The figure @TomNicholas was referencing (thanks for the reminder, I forgot about this!): I mention that because this problem is inherent to the way the current task scheduling algorithm works in distributed; see #5223 and #5555.. So approaches like #4891 are not, in my mind, the "full solution"—they'd be layering on more complexity to an already-complex algorithm with two components (normal eager scheduling, and task stealing aka occasional rebalancing). "Speculative task assignment" (#3974) is more what I'd consider the "full solution", thought I think there are other approaches that could work as well. Just for context :) |
As a way to hack around this, right now, I'd play with worker resources. For example, something along the lines of (untested): import dask
import coiled
import xarray as xr
NTHREADS = 4
cluster = coiled.Cluster(..., worker_cpu=NTHREADS, worker_options={"resources": {"ROOT": NTHREADS}})
client = dask.distributed.client(cluster)
with dask.annotate(resources={"ROOT": 1}):
ds = xr.open_dataset(...)
result = ds.a - ds.a.mean()
with dask.config.set({"optimization.fuse.active": False}):
result.compute() A few caveats:
|
Thanks both for your replies. @dcherian batching like you suggested is one of my backup plans 😅, but thank you for the code example! @gjoseph92 I tried playing with worker resources as you suggested, but if it made any difference to the behaviour I couldn't tell. Without setting a limit on resources my cluster starts failing like this: and after I set a limit via options = g.cluster_options()
NTHREADS = 1
options.worker_cores = NTHREADS
options.worker_memory = 50
with dask.config.set({"resources": {"ROOT": NTHREADS}}):
gc = g.new_cluster(cluster_options=options)
with dask.annotate(resources={"ROOT": 1}):
ds = xr.open_dataset(...)
omega = vort(ds) # my calculation, an xarray.apply_ufunc
with dask.config.set({"optimization.fuse.active": False}):
with gc.get_client():
# write result to zarr
st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True) it looks like this You can see that in both cases it has managed to save out ~300 chunks ( At @jbusecke's suggestion I also tried setting the task priority via with dask.annotate(priority=-999999):
ds = xr.open_dataset(...)
with dask.annotate(priority=999999):
with gc.get_client():
# write result to zarr
st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True) which appeared to help the computation proceed a bit somewhat, but after a while my memory usage still keeps growing indefinitely. This is a subset of my optimised (partially fused) task graph, for two timesteps. It's basically two At full scale I'm trying to compute 9000 timesteps. The difference is that the outer-most 4 task chains (2 on each side) are duplicated (there are 18000 The static ordering looks reasonable - dark orange should be written out before yellow and light orange are loaded But this static ordering seems to be largely irrelevant to what actually happens when you have multiple workers. Is this a pathological fail case? Whatever I've tried my 20 workers still race to open ~400 chunks (loading hundreds of GBs into memory and spilling to disk) before they manage write a single chunk out, even though each output chunk only requires two input chunks. There might be some way for me to inline/fuse my way to an embarrassingly parallel graph, so that the open and write steps are in one task, but at that point I may as well have just used |
Yes, the priority ordering doesn't have that much effect (it's just a tie-breaker, not an actual ordering that the scheduler follows).
I'm a bit surprised by this (but I agree, it didn't look like anything changed). I looked into it a little and it turns out the scheduling of tasks that use resources doesn't work the way I expected: #6468 makes it work the way you'd expect. @TomNicholas, I'd be really curious to see how things work if you keep using resources, but switch to that PR. There's been lots of discussion of root-task overproduction, but we've never actually gotten to see how things would go if it didn't happen, so having this comparison would be very valuable. |
Thanks so much for that @gjoseph92 - I'm excited to try it out, but it'll take me a few days as I need to work out how to run on a custom version of |
You could use Coiled if that would be easier; you can install |
@gjoseph92 @jbusecke and I are trying to test this but our coiled cluster won't actually start :/ This is the environment file we're using, which we tried both locally and in the pangeo cloud
With the default environment on pangeo cloud we are able to start a cluster, but that doesn't use your updated version of distributed. This succeeds: import coiled
coiled.create_software_environment(
conda="environment.yml",
) But then When we try to use our altered environment (with your distributed PR and xarray's Is there some dask dependency we've forgotten here? Is there a better channel to get help with this coiled cluster issue? |
@TomNicholas why don't you join the Coiled slack; it'll be easier to go back and forth there. I think this link works, but if not it's at the bottom-left of https://cloud.coiled.io: https://join.slack.com/t/coiled-users/shared_invite/zt-hx1fnr7k-In~Q8ui3XkQfvQon0yN5WQ. I also haven't tested my PR at all. It's entirely possible I messed up something silly. The cluster logs might show more. I also wouldn't be surprised if you need to add |
We actually tried this actually, but it failed due a dependency pinned on the last dask release. Just FYI |
@TomNicholas and I reproduced a variant of his failing workload on Coiled, running on my PR #6467. We used worker resources to annotate root tasks as described in #6360 (comment). With my PR, this made it so the scheduler would stream root tasks into workers, only submitting a new root task when it heard the previous had completed, instead of submitting all root tasks up front, which the current scheduling algorithm does. This allowed us to simulate how a cluster would perform if root task overproduction were fixed. The results are compelling: Withholding root tasks from workers enormously reduced memory use (15x lower peak memory usage) while cutting runtime nearly in half. These trials used the same workload on the same cluster, both on #6467. The only difference was "root tasks withheld" wrapped You can see how different memory usage looked on the dashboard: It's important to note that because we used resource restrictions, scheduling didn't use co-assignment logic (#4967). I believe that's the reason you see transfers on the second dashboard (two tasks that contributed to the same one output were scheduled on different workers). Implemented properly to take advantage of this, I expect fixing root task overproduction would give even better runtime (maybe 20-40% faster?) and lower memory usage than what we see here. Important takeaways:
I estimate that a scheduler-only fix for root task overproduction, which just withheld root tasks from workers while maintaining our co-assignment logic, could be implemented in a week. |
Those are compelling plots. There's a lot here. Is there a thing that I or someone like @fjetter should read to understand the proposed design? In principle if we could reliably solve this problem with a week of dev time and not cause other problems then that sounds like time well spent. |
Part of the week includes coming up the the design. But the overall idea is something like:
The implementation of this is what needs consideration, particularly how we still get co-assignment (which right now relies on running through all the root tasks at once). Maybe something like:
The advantage is that it all happens on the scheduler. Nothing has to change with the worker; from the worker's perspective, it just doesn't get lots of tasks queued on it. That makes me more confident in it not causing deadlocks, since the scheduler is generally easier to reason about than the worker state machine. This obviously introduces a bit more idleness on the worker: when it completes a root task, it doesn't have another root task queued, so it has to wait for a roundtrip to the scheduler for the next instruction. Speculative task assignment would eventually be a nice optimization to avoid this. This experiment here shows accepting that latency (and a horrendously slow scheduling algorithm; it couldn't even handle 200k tasks) but taking a better path through the graph is still faster overall than quickly taking a bad path through the graph. Slow is smooth, smooth is fast. |
Historically I would have said "I'd rather not complicate the scheduler further. It makes sense to have the worker handle this given how relatively simple the worker is" With recent changes though I'm not sure that that is still the case. It seems like the worker might be a place of more complexity than the scheduler these days. I neither love nor hate this plan. |
The scheduler-only aspect of it is the main appeal to me, for that reason. The worker also can't do this on its own without extra information from the scheduler (it doesn't know enough about overall graph structure). So we either have to touch both scheduler and worker, or only scheduler. |
It could guess. We could pass rootishness around. It could also see that "hey, I often have tasks that arrive just after these rootish tasks are done. I guess I should maybe wait a bit". I'm not pushing for this though. The worker seems to be a larger source of complexity today than the scheduler. |
I really don't like the idea of guessing. Passing rootishness around was one of my thoughts, but it introduces consistency issues (if downstream tasks are cancelled, you have to inform workers to de-rootify their tasks) without providing any complexity or performance benefit (without STA, the worker would still have to wait for a scheduler roundtrip before it could run a new task). |
I'm not pushing for my thought above. There's no need to argue against it. I do think that you should develop some comfort with guessing though. We don't need to be perfect, just good in aggregate. |
I agree with @gjoseph92 here and I wouldn't want to implement any logic on the worker that would introduce artificial waiting times. It's current state is not well suited to handle this kind of logic well. I do not see any problems with STA on worker side. We likely need to adjust a few transition rules but that should be doable after the latest changes.
The request of "I would like to limit the total number of tasks of type X concurrently executing on the cluster" has come up a couple of times, see I'm wondering if this can/should be solved first generically before we write a specialized version for root tasks. That would obviously only be part of the solution but it would be a re-usable part
Can we first deliver something that does not care about co-assignment? IIUC, your prototype does not consider co-assignment and it still is a major win. |
Just to be perfectly clear, so do I. I brought up the other approach to say "there are other ways we could think about doing this" not to say "we should do it this way instead". |
Update: we're planning on working on this, starting late next week or the week after: #6560. The goal is a draft PR (not necessarily merged) within 2 weeks. When that's up, we'd very much appreciate input from people involved in this thread who could test us out and let us know how it affects performance on real-world cases. |
Here's a notebook containing a simple example demonstrating the root task overproduction problem. This example only uses dask, not importing xarray or xGCM anywhere. It's set up to run on a coiled cluster, using Gabe's PR. The conda environment file is just
The offending graph looks like this I think @gjoseph92 was able to come up with some even simpler examples, but I'm sharing this one because it's the simplest one I could create that still looks somewhat like the original problem I was trying to compute (with xGCM). |
Two even simpler examples (pure dask). These are import dask.array as da
a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
s = da.stack([a, b])
r = s.sum(axis=0)
r.visualize(optimize_graph=True) import dask.array as da
a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
r = a[1:] + b[1:]
r.visualize(optimize_graph=True)
The input tasks (random array creation) will be overproduced relative to output tasks ( (@TomNicholas is demonstrating the same thing in his notebook. Instead of slicing or concatenation, he's adding a |
Also, the problem @TomNicholas has been demonstrating here is a simplified form of what he actually needs to solve. Here, he's computing What he actually needs to compute is This not only suffers from root task overproduction, but also hits the "widely-shared dependencies dogpile": #6570 |
The need for memory backpressure has been discussed at length, and a full solution is arguably necessary to solve many of the use cases in the pangeo community. Whilst efforts in that direction do seem be underway, I would like to suggest a simpler stop-gap solution for the shorter term. The basic issue is that workers can overeargerly open new data at a rate faster than other tasks free up memory, and is best understood via the figure under "root task overproduction" here.
All my tasks that consume memory are root tasks, so is there any way to deprioritize root tasks? This should be a lot simpler than solving the general problem, because it doesn't require (a) any records kept of how much memory each task uses or (b) workers to change their priorities over the course of a computation based on new information. All I need is for workers to be discouraged / prevented from opening new data until the task chains that are already begun are as complete as possible. This will obviously waste time as workers wait, but I would be happy to accept a considerable slowdown if it means that I definitely won't run out of memory.
This problem applies to any computation where the data is largest when opened and then steadily reduced (which is most pangeo workflows, with the important exception of the groupby stuff IIUC), but the opening task and the memory-relieving tasks can't be directly fused into one.
Is there some way of implementing this? I'm happy to provide an example use case if that would help, or to try hacking away at the distributed code, but I wanted to know if this is even possible first.
cc @gjoseph92 , and @rabernat @dcherian @TomAugspurger , because this is related to the discussion we had in the pangeo community meeting the other week.
The text was updated successfully, but these errors were encountered: