Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make root-ish definition static #7531

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 9, 2023

@gjoseph92 The topic of dynamic root task definition were brought up quite frequently in the past. I never figured out why the cluster size has to be part of the classification logic.
From everything I understand this should be moved to the actual decision logic and not the classification. The property of being rootish, the way it is defined right now, is actually a static TaskGroup property.

In the case of co-assignment, I can understand that we do not want to assign too small task groups but why not handle this case in the decision logic itself instead of the classification?

TODO

  • Fix two failing tests since they appear to be related
  • Clean up test cases

@@ -2052,6 +2052,7 @@ def decide_worker_rootish_queuing_disabled(
and tg.last_worker_tasks_left
and lws.status == Status.running
and self.workers.get(lws.address) is lws
and len(tg) > self.total_nthreads * 2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this statement here makes it also much easier to motivate it.

Below, we're calculating the number of tasks of the given task group we can assign safely to a worker (len(tg) / self.total_nthreads) * ws.nthreads

Therefore, last_worker_tasks_left > 2 * ws.nthreads ~ tasks_assigned / thread > 2

This means, we're only scheduling using co-assignment if we can at least assign two tasks per thread. Otherwise, we abort to protect our ability to scale out and use the ordinary worker_objective to distribute tasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not this "at least two tasks per thread" logic is a good choice is an entirely different matter, of course.
However, this gives some meaning to this otherwise arbitrary choice

@fjetter
Copy link
Member Author

fjetter commented Feb 9, 2023

My first pass over failing tests didn't reveal anything scandalous.

  • test_priorities file put in some test assumptions about how many tasks are supposed to be queued on scheduler / worker when a worker is paused. This counting obviously changes with queuing. I also had to set the saturation to exactly 1.0 for these tests to even work the way they are. We may need to review these changes more carefully
  • A couple of tests that just don't make sense if any of the submitted tasks are treated as being queued. I set the worker saturation to inf in these cases. They mostly test worker logic and this pick is fine imo
  • The most interesting failure that is entirely unrelated to queuing is test_scatter_compute_store_lose_processing. There is a race condition on worker shutdown that causes a client never to receive cancelled-keys messages. This is only popping up here now because queuing slightly changes the timing of the worker close (since w/ queuing we don't need to wait for the worker threadpool to finish)

return False
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this all works out well, I would actually like to raise this method to a TaskGroup property instead. We can implement a similar short-circuit there although I'm not sure if this is actually worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the short-circuit still makes sense with what you have here. With len(tg) > 5, a task with no dependencies would be non-rootish if it's in a group of 1.

This would let us close

Comment on lines +2078 to +2108
def worker_objective_rootish_queuing(self, ws, ts):
# FIXME: This is basically the ordinary worker_objective but with task
# counts instead of occupancy.
comm_bytes = sum(
dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has
)
# See test_nbytes_determines_worker
return (len(ws.processing) / ws.nthreads, comm_bytes, ws.nbytes)
Copy link
Member Author

@fjetter fjetter Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjoseph92 I remember you very strongly opposing adding an objective function that respect comm_bytes. I remember vaguely a point about widely shared dependencies but I do not see an issue with this.

We need an objective like this to ensure very basic scheduling policies to be respected, e.g. test_nbytes_determines_worker where a dependent task is supposed to be scheduled on a worker holding most of its dependencies.

This should only truly matter if there are multiple workers available with an equal ratio of processing tasks per thread. My intuition tells me this objective function should still make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref #7280

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_nbytes_determines_worker where a dependent task is supposed to be scheduled on a worker holding most of its dependencies

tl;dr then that task shouldn't have been queued in the first place, and the root-ish heuristic is at fault, not the objective function. For a task to be queued, it must be equally good to run on any worker.

I do oppose an objective function that considers data transfer for root-ish tasks. It contradicts the definition of root-ish-ness: that the location of the tasks' dependencies are not important for scheduling purposes.

When there's only one thread open in the cluster, on a random worker, the next task on the queue must be equally "good" to run on that random worker as on any other worker in the cluster. If it would have been "better" if the thread had opened on a different particular worker—because that worker had more dependencies in memory, say, or because it had some resource—then the task should not have been on the queue in the first place. (This is why putting tasks with restrictions on the queue breaks the core assumption of queuing: #7526.)

This is important because we choose the pool of candidate workers for queued tasks based only on how busy they are. That works because we can guarantee there are no other factors that should play into worker selection for these tasks.

The root-ish-ness heuristic is meant to provide that guarantee, by identifying task groups where:

  1. The tasks will end up being assigned to every worker in the cluster (len > nthreads)
  2. Therefore, any common dependencies the tasks have will end up being replicated onto every worker in the cluster (deps len < 5)

The key is the second point: because the dependencies will be replicated to all workers, we don't need to select workers that avoid data transfer—in the long term, the data transfer is unavoidable. Indeed, we even know of some cases (that don't meet the root-ish heuristic, to be clear) where trying to avoid a transfer leads to worse scheduling choices: #6570.

So the reason I oppose an objective function that considers data transfer for root-ish tasks is because it implies that one worker might be preferable over another based on something besides busyness, which violates these core assumptions.

But to be clear: I only oppose this on the grounds of logical consistency. I don't expect an objective function that considers transfer will actually make a measurable different in most cases. (In most cases, the size of the candidate pool is 1 anyway.) But since it contradicts this key assumption of queuing, I'd want to come up with logical proof of why it's safe—and why it's preferable—especially since we know this type of weighting leads to bad decisions in other cases (#6570).

I haven't bothered to put in the effort to make the proof, since I haven't seen what the gain would be. I'm happy to be shown that it's both safe and better, though—again, I just haven't seen a reason to.

See also:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I think this is a moot point once I introduce some cutoff as suggested in the other comment. It would've been fine to just point out this error instead of writing such a long comment.

It contradicts the definition of root-ish-ness: that the location of the tasks' dependencies are not important for scheduling purposes.

I disagree with this definition. The reason why we're having a special case for these tasks is because we assume they are data producers. The natural assumption is that they are at the bottom of our graph and are the most frequent source of data overproduction.
While I see some value for intermediate layers to be considered as root-ish I believe the same argument holds.
I don't think these perspectives are orthogonal but your definition is not the whole truth.

If something like #6570 is truly the complexity driver here we should consider alternative approaches (inlining, better weights, scheduler hints, etc.)

If it would have been "better" if the thread had opened on a different particular worker—because that worker had more dependencies in memory, say, or because it had some resource—then the task should not have been on the queue in the first place.

I also don't agree with this sentiment. I see a point for suppressing/ignoring dependencies but especially for cases where worker-saturation>1 I see how nuance can matter. Total randomness is not necessarily desired.

Copy link
Collaborator

@gjoseph92 gjoseph92 Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long comment. We've talked about this a few times so I wanted to have it all written out.

The reason why we're having a special case for these tasks is because we assume they are data producers

Yes, sorry, you're right. Root-ish-ness doesn't inherently care that the tasks can be scheduled without regard for their dependencies. It's just queuing that currently assumes this property.

It's unfortunate that these things are so intertwined right now. What began as a heuristic for co-assignment is now used as a heuristic for the current implementation of queuing. And it was always a weird heuristic.

The better way to phrase it might be that "whatever we use to decide whether to queue a task should guarantee the assumptions that queuing makes". Currently we've been using root-ish-ness, which does guarantee those assumptions. But there are probably other ways we could do this. And/or we can change the implementation of queuing to have different assumptions. We just shouldn't change only one without the other.

EDIT: switching to len(tg) > 5 does change the guarantees about what goes into the queue, but I think using worker_objective_rootish_queuing would then adjust what queuing is expecting in a compatible way; see other comment.

@fjetter
Copy link
Member Author

fjetter commented Feb 9, 2023

there is a likely related failure distributed/tests/test_scheduler.py::test_deadlock_resubmit_queued_tasks_fast on ubu3.9 / no-queue I need to check out

@fjetter
Copy link
Member Author

fjetter commented Feb 9, 2023

benchmarks are mostly harmless for this commit. we get a nice kick for some array slicing operations both in runtime and peak memory (plot shows peak)

image

There is also a weak signal for other join/shuffle related tests but they are pretty noisy and I wouldn't put too much weight on them

https://github.com/coiled/coiled-runtime/actions/runs/4136037002

Wall Clock

image

Avg memory

image

Peak memory

image

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±  0         24 suites  ±0   10h 14m 32s ⏱️ - 11m 28s
  3 344 tests +  7    3 234 ✔️ +  1     105 💤 +1  4 +4  1 🔥 +1 
39 426 runs  +84  37 544 ✔️ +73  1 876 💤 +5  5 +5  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit ee4c4c4. ± Comparison against base commit 63ae1db.

♻️ This comment has been updated with latest results.

@fjetter fjetter marked this pull request as ready for review February 9, 2023 18:18
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also really want to make the root-ish definition static. However, I don't think we can drop the check against total_nthreads without changing some other fundamental things.

Comment on lines +2078 to +2108
def worker_objective_rootish_queuing(self, ws, ts):
# FIXME: This is basically the ordinary worker_objective but with task
# counts instead of occupancy.
comm_bytes = sum(
dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has
)
# See test_nbytes_determines_worker
return (len(ws.processing) / ws.nthreads, comm_bytes, ws.nbytes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_nbytes_determines_worker where a dependent task is supposed to be scheduled on a worker holding most of its dependencies

tl;dr then that task shouldn't have been queued in the first place, and the root-ish heuristic is at fault, not the objective function. For a task to be queued, it must be equally good to run on any worker.

I do oppose an objective function that considers data transfer for root-ish tasks. It contradicts the definition of root-ish-ness: that the location of the tasks' dependencies are not important for scheduling purposes.

When there's only one thread open in the cluster, on a random worker, the next task on the queue must be equally "good" to run on that random worker as on any other worker in the cluster. If it would have been "better" if the thread had opened on a different particular worker—because that worker had more dependencies in memory, say, or because it had some resource—then the task should not have been on the queue in the first place. (This is why putting tasks with restrictions on the queue breaks the core assumption of queuing: #7526.)

This is important because we choose the pool of candidate workers for queued tasks based only on how busy they are. That works because we can guarantee there are no other factors that should play into worker selection for these tasks.

The root-ish-ness heuristic is meant to provide that guarantee, by identifying task groups where:

  1. The tasks will end up being assigned to every worker in the cluster (len > nthreads)
  2. Therefore, any common dependencies the tasks have will end up being replicated onto every worker in the cluster (deps len < 5)

The key is the second point: because the dependencies will be replicated to all workers, we don't need to select workers that avoid data transfer—in the long term, the data transfer is unavoidable. Indeed, we even know of some cases (that don't meet the root-ish heuristic, to be clear) where trying to avoid a transfer leads to worse scheduling choices: #6570.

So the reason I oppose an objective function that considers data transfer for root-ish tasks is because it implies that one worker might be preferable over another based on something besides busyness, which violates these core assumptions.

But to be clear: I only oppose this on the grounds of logical consistency. I don't expect an objective function that considers transfer will actually make a measurable different in most cases. (In most cases, the size of the candidate pool is 1 anyway.) But since it contradicts this key assumption of queuing, I'd want to come up with logical proof of why it's safe—and why it's preferable—especially since we know this type of weighting leads to bad decisions in other cases (#6570).

I haven't bothered to put in the effort to make the proof, since I haven't seen what the gain would be. I'm happy to be shown that it's both safe and better, though—again, I just haven't seen a reason to.

See also:

and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
)
return len(tg.dependencies) < 5 and sum(map(len, tg.dependencies)) < 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about losing the len(tg) > self.total_nthreads check. That means that, for example, task c would now be considered root-ish:

  c
 / \
a   b

We don't want task c to be queued, for reasons described in the comment above. c isn't equally good to run on any worker—it should only be run on the workers that hold a or b. It's better for c run on the right worker, but maybe have to wait, than run on whichever wrong worker has a task slot open first.

The point of the total_nthreads check is that it identifies fan-out situations, where we go from dependencies that are on very few workers to dependents that will be on every worker. The fact that the dependencies will be copied to every worker is what allows us to ignore them while scheduling, which is what allows us to queue the tasks, since in queueing, worker busyness must be the only important scheduling factor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add the check against total_nthreads to the queuing_disabled case, but not queuing_enabled? If we could add the check into queuing_enabled as well, then I think this would be fine. But I'm also not clear on what the benefit of all this change would be then—the definition still isn't "static" in a practical sense, we'd just be moving the non-static decision to a different point in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means that, for example, task c would now be considered root-ish:

Yes, your abc example would queue up c but I don't think the "protection" from this should be depending on the cluster size.

Thinking about this a bit, I'm perfectly happy adding a len(tg) > 2 to this condition. That's the minimum a running cluster can have under the original condition and protects us from this edge case. In fact in all symmetrical graphs, I believe this is even sufficient to avoid any root-ish classifications downstream (I don't have a proof).

I could even justify a len(tg) >= 5 which combined with the other two conditions ensure that this is either a root or a fan out (please correct me if my logic here is flawed).

I consider "root-ish" a topological attribute of the node and not a dynamic one depending on cluster size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add the check against total_nthreads to the queuing_disabled case, but not queuing_enabled?

Well, the entire point I want to make here is that cluster size should not impact the classification logic.
If you accept for a moment that the root-ish definition can be formulated statically the way I suggested above, there is no need for the queuing logic to differentiate based on cluster sizes.
There is, however, a reasonably valid argument about co-assignment https://github.com/dask/distributed/pull/7531/files#r1101465735

Hence, I only added it to one of the code path for which I can actually make a valid case for. I don't think queuing should (or currently actually does) care about the size of the cluster. Whether or not the decision function is task dependent is a different matter, see other thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence, I only added it to one of the code path for which I can actually make a valid case for

I see, that makes sense.

I think with the len(tg) >= 5 or len(tg) > 2, that resolves some concerns. For any magic number though, I think it's easy enough to come up with exceptions.

Here, each of c-h depends on both a and b (idk how to make illustrate that with ascii art):

c d e f g h
 \xxxxxxx/
    a b

There are 6 tasks in the group, so c-h are considered root-ish.

In a 4-worker cluster, it would make sense to schedule them anywhere. But in a 100-worker cluster with multi-threaded workers, you would ideally try to consolidate to just a few workers, so that a and b don't have to be transferred as much.

Now I'm seeing why you wanted worker_objective_rootish_queuing. Switching from len(tg) > nthreads to len(tg) > 5 subtly changes the guarantee about what tasks are in the queue. But by using worker_objective_rootish_queuing, that would change the assumptions queuing is making in a compatible way:

  • We can no longer guarantee that dependencies will need to be replicated to every worker.
  • So when there are multiple workers to choose from, we should consider dependencies.
  • However, if there's only 1 worker to choose from, that most likely means that we've filled all the other workers already, which means that the group was larger than the cluster, and dependencies were replicated everywhere anyway. So it's okay that we don't get to consider any other workers.

I feel like calling c-h "root-ish" is still inaccurate, but that's just a naming problem. With worker_objective_rootish_queuing, queuing would handle them decently. So maybe "root-ish" should be renamed to "queuable"?

@fjetter
Copy link
Member Author

fjetter commented Feb 10, 2023

I introduced a cutoff as suggested in #7531 (comment) and elevated the rootish definition to a TG property.
Since this is now a graph/topological property we can also write proper tests. I copied a couple over from dask.order but do not intend to add much more there unless there is a good reason for it. I'm not entirely sure how valuable these are but I like having the minimal infrastructure in there in case we want to expand or modify the definition

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter this is looking better to me. I realize I think I'm also missing the larger picture here. What's the motivation for working on this right now? Is there some immediate next work that this would enable, or are you just looking to clean it up?

FWIW, making this definition static also closes #7273.

Comment on lines +2078 to +2108
def worker_objective_rootish_queuing(self, ws, ts):
# FIXME: This is basically the ordinary worker_objective but with task
# counts instead of occupancy.
comm_bytes = sum(
dts.get_nbytes() for dts in ts.dependencies if ws not in dts.who_has
)
# See test_nbytes_determines_worker
return (len(ws.processing) / ws.nthreads, comm_bytes, ws.nbytes)
Copy link
Collaborator

@gjoseph92 gjoseph92 Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long comment. We've talked about this a few times so I wanted to have it all written out.

The reason why we're having a special case for these tasks is because we assume they are data producers

Yes, sorry, you're right. Root-ish-ness doesn't inherently care that the tasks can be scheduled without regard for their dependencies. It's just queuing that currently assumes this property.

It's unfortunate that these things are so intertwined right now. What began as a heuristic for co-assignment is now used as a heuristic for the current implementation of queuing. And it was always a weird heuristic.

The better way to phrase it might be that "whatever we use to decide whether to queue a task should guarantee the assumptions that queuing makes". Currently we've been using root-ish-ness, which does guarantee those assumptions. But there are probably other ways we could do this. And/or we can change the implementation of queuing to have different assumptions. We just shouldn't change only one without the other.

EDIT: switching to len(tg) > 5 does change the guarantees about what goes into the queue, but I think using worker_objective_rootish_queuing would then adjust what queuing is expecting in a compatible way; see other comment.

and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
)
return len(tg.dependencies) < 5 and sum(map(len, tg.dependencies)) < 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence, I only added it to one of the code path for which I can actually make a valid case for

I see, that makes sense.

I think with the len(tg) >= 5 or len(tg) > 2, that resolves some concerns. For any magic number though, I think it's easy enough to come up with exceptions.

Here, each of c-h depends on both a and b (idk how to make illustrate that with ascii art):

c d e f g h
 \xxxxxxx/
    a b

There are 6 tasks in the group, so c-h are considered root-ish.

In a 4-worker cluster, it would make sense to schedule them anywhere. But in a 100-worker cluster with multi-threaded workers, you would ideally try to consolidate to just a few workers, so that a and b don't have to be transferred as much.

Now I'm seeing why you wanted worker_objective_rootish_queuing. Switching from len(tg) > nthreads to len(tg) > 5 subtly changes the guarantee about what tasks are in the queue. But by using worker_objective_rootish_queuing, that would change the assumptions queuing is making in a compatible way:

  • We can no longer guarantee that dependencies will need to be replicated to every worker.
  • So when there are multiple workers to choose from, we should consider dependencies.
  • However, if there's only 1 worker to choose from, that most likely means that we've filled all the other workers already, which means that the group was larger than the cluster, and dependencies were replicated everywhere anyway. So it's okay that we don't get to consider any other workers.

I feel like calling c-h "root-ish" is still inaccurate, but that's just a naming problem. With worker_objective_rootish_queuing, queuing would handle them decently. So maybe "root-ish" should be renamed to "queuable"?

return False
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the short-circuit still makes sense with what you have here. With len(tg) > 5, a task with no dependencies would be non-rootish if it's in a group of 1.

This would let us close

Comment on lines 1113 to 1118
and len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
and (ls := len(self)) >= 5
and len(self.dependencies) < 5
and (ndeps := sum(map(len, self.dependencies))) < 5
and ls / ndeps > 2

Maybe we're trying to get at the idea of "fanning out"? As in there are significantly (say 2x) more tasks in the group than there are inputs?

idk if this is valuable, just a thought. I think it would also eliminate the need for the magic len(self) >= 5.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell if this is truly better or just more complex. I'm not even sure if "fanning out" is something we want to capture. (If so, your logic would be better, yes)
A major motivator for this PR is to simplify this definition. We might get it wrong with this PR and we can iterate on it but from now on I want to see changes to this definition being motivated by tests. I don't have a good enough intuition to tell you where this definition might fall short or what edge cases would not be covered.
My definition is more allowing while yours is more restrictive, I believe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitant here because we don't have any examples (i.e. tests) that indicate the one or the other would be favorable. At least in the future we should motivate changes to this by new/modified tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we went for your logic, why even include len(self) >= 5? Why not just

(
            len(self.dependencies) < 5
            and (ndeps := sum(map(len, self.dependencies))) < 5
            # Fan-out
            and (len(self) / ndeps > 2 if ndeps else True)
        )

(FWIW this would also include the case of no deps naturally

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't like about this suggestion is that there is again another magic number 2 in there. Why would a fan-out only be a fan-out if there is a factor of 2? For entirely symmetric cases I can see how we can motivate 2 but not for asymmetric ones, e.g.

graph BT;
  A1-->B1;
  A1-->B2;
  A2-->B3;
  A2-->B4;
  A3-->B5;
Loading

Is this meaningfully different to the case where A3 splits into two tasks as well? Anyhow, I believe arguing about these small graphs is not truly relevant. As soon as we hit any real world graphs these magic numbers are getting more and more meaningless

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we went for your logic, why even include len(self) >= 5?

Sorry, I meant to remove that check, just forgot to delete it in the diff. Agreed that it's unnecessary.

For entirely symmetric cases I can see how we can motivate 2 but not for asymmetric ones, e.g.

This gets at a larger problem, which is the use of TaskGroups at all for these types of scheduling choices. We're assuming that TaskGroups indicate something structural about the graph, but they don't necessarily. I think we have to accept that we're assuming TaskGroups are:

  • symmetrical
  • correspond 1:1 with structural layers in the graph

If we're going to use TaskGroups for these sorts of heuristics, we have to be okay taking on those assumptions. I'd rather focus on graphs that meet these assumptions, but still would schedule poorly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure if "fanning out" is something we want to capture

I'm not sure either. Though fanning out is what the "ish" part of root-ish task logic was trying to capture. The idea was that when a tiny number of tasks "fan out" to a much wider number of tasks, if you don't copy the input tasks to most workers, you'll end up highly imbalanced, with most workers idle and a couple overwhelmed.

Mathematically though, I'm realizing that so long as we have sum(map(len, self.dependencies)) < 5, whether we call it fan-out, or a lower bound on len(self), they're effectively equivalent. They're both expressing "there are very few tasks that go into the task group, and the task group is decently larger than the number of inputs". It's just arithmetic to turn one into the other.

What I don't like about this suggestion is that there is again another magic number 2 in there

Yup, I don't like that either. I don't think we'll be able to come up with a static definition for "should be queued" that doesn't include a magic number though, at least without significantly refactoring how the queued state works.

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize I think I'm also missing the larger picture here. What's the motivation for working on this right now?

Mostly I was poking at the code base to get either a fix or a better understanding for the P2P problem (#7496). Even after spending months with this topic, I was still surprised about the complexity that I decided to try a hard simplification.
I believe that maintenance and continued enhancements around this will otherwise not be feasible
From a different perspective, if you feel the need to write comments like #7531 (comment) even though tests are green and benchmarks look good, the code is too complex and/or not tested well enough.

Comment on lines 1113 to 1118
and len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell if this is truly better or just more complex. I'm not even sure if "fanning out" is something we want to capture. (If so, your logic would be better, yes)
A major motivator for this PR is to simplify this definition. We might get it wrong with this PR and we can iterate on it but from now on I want to see changes to this definition being motivated by tests. I don't have a good enough intuition to tell you where this definition might fall short or what edge cases would not be covered.
My definition is more allowing while yours is more restrictive, I believe.

Comment on lines 1113 to 1118
and len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitant here because we don't have any examples (i.e. tests) that indicate the one or the other would be favorable. At least in the future we should motivate changes to this by new/modified tests.

Comment on lines 1113 to 1118
and len(self) >= 5
and len(self.dependencies) < 5
and sum(map(len, self.dependencies)) < 5
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we went for your logic, why even include len(self) >= 5? Why not just

(
            len(self.dependencies) < 5
            and (ndeps := sum(map(len, self.dependencies))) < 5
            # Fan-out
            and (len(self) / ndeps > 2 if ndeps else True)
        )

(FWIW this would also include the case of no deps naturally

@fjetter
Copy link
Member Author

fjetter commented Feb 14, 2023

Benchmarks are inconclusive. There are a couple of outliers but the null hypothesis is also very shaky. I'll rerun with more repeats.
https://github.com/coiled/coiled-runtime/actions/runs/4166257352

@fjetter
Copy link
Member Author

fjetter commented Feb 15, 2023

Reran benchmarks with much more repetitions and everything looks OK. So, at least for the selection of workloads I ran this change likely does not impact the classification logic (as expected)

https://github.com/coiled/coiled-runtime/actions/runs/4172181281

Wall Clock

image

Avg memory

image

Peak memory

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants