-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider stage balancing only in UniformNodeSelector #10660
Conversation
FYI: @dain @pettyjamesm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts, observations about this change:
Previously, the logic would first attempt to chose a candidate node based on total splits across all stages. Now, it chooses the node with the lowest "queued" splits per task (so long as it hasn't exceeded the total splits per node limit, or unacknowledged splits per task limit).
However, "queued splits per task" doesn't include running or blocked splits, only splits that are queued on worker, queued in the coordinator pending delivery, or assigned during this scheduling loop and not yet added to the coordinator-local HttpRemoteTask
for delivery. That's probably for the best, since it should more evenly distribute pending work to tasks within a given stage, but can create arbitrary imbalances between worker nodes if some tasks have very different per-split processing times (eg: highly selective filters followed by computationally intensive operations) or when a subset of the nodes approaches the per node memory limit.
Also note, that the previous behavior also didn't account for splits that were blocked, and counted those towards the node total splits as if they were running. In the case where splits were blocked on memory, that was probably beneficial- but when those splits were blocked because the output buffer is full, that's a problem.
I think that's probably related to your observed behavior in this PR, although I can't say for sure because I'm not familiar with all the details of the new execution policy compared to the prior "all-at-once" behaviors.
Consider the case of a simple join with table scans on both probe and build sides, and let's assume the semantics of the "all-at-once" execution policy:
- Initial split assignments would be made to both stages until the probe side output buffers became full, at which point any probe side running splits block.
- The now blocked probe side splits still counted towards the node total splits as if they were running, limiting the number of new build stage splits that can be assigned on those nodes.
- Worse: the scheduler will then preferentially assign build stage splits tasks with the fewest number of blocked probe stage splits, which is essentially random.
- This continues until the probe stage unblocks and starts finishing splits again (ie: until the build side completes. If the build stage only has a few splits, the per node split assignment limit might never be reached and split assignments based on max splits queued per task might never occur.
So it would seem that your changed behavior is probably preferable for benchmark scenarios, but probably worse when blocking on memory and/or memory revoking occurs unevenly across worker nodes.
core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
Outdated
Show resolved
Hide resolved
If there are imbalances between workers, then that should be reflected by queued splits after query runs for a while. The problem I'm solving here is nondeterminism and randomess of scheduling when queries are are short to medium length (e.g <60s). If there are multiple stages scheduling and some splits are really tiny (yet still occupy split queue) we might end up with a very bad, non-uniform split assignments for large splits and that increases wall time. For longer queries imbalances will eventually be reflected by queue length as workers that are slow will be slow to consume stage splits from queue. It shouldn't really matter if it's because of memory or full buffers.
We probably should consider blocked splits when computing
Tracking splits on full buffer could also potentially generate flakyness. This is because of the 1/0 nature of blocking on buffer. There is nothing in between. If
In the new scheduler we don't start probe immediately, so it doesn't consume valuable driver slots.
I don't think blocked splits account into total node assignments. See
I've seen cases where probe splits took all available driver slots even though splits could not progress. The only way build was progressing was due to
The problem I'm solving is preexisting. With this PR for longer running queries splits should balance anyway according to worker load (because split queue would shrink slower). For shorter running queries we would be less affected by momentary scheduling unbalance and variability. |
Flaky due to #10674 |
Some more notes:
|
86701c1
to
071db2c
Compare
071db2c
to
7d58384
Compare
added feature toggle |
7d58384
to
b6700be
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have hard time predicting how well it behaves in practice but LGTM. If you tested it and it showed improvements, and we have a kill-switch I think it is fine to have it merged.
One thing to consider is would it make sense to make it a default if we believe it is generally (most often) better approach. Most people would not change defaults so without changing a default this option would mostly by unused.
fee03a5
to
782d46b
Compare
Previously, new stage splits were balanced against all splits running on worker nodes (that includes splits from other stages and queries). However, this leads to non-deterministic scheduling where in certain situations some stage with long running splits might be fully scheduled on subset of worker nodes. This PR makes UniformNodeSelector only consider stage balancing on candidate nodes which have sufficient split queue length.
782d46b
to
40d8992
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to distribute stage splits evenly across the cluster seems reasonable to me. Assuming all nodes are uniform and all the splits (adjusted for weight) are relatively even it shouldn't in theory introduce a regression while fixing the issues you mentioned.
I was trying to think about these two scenarious:
There's a struggling node / nodes in a cluster that processes splits much slower that the others. With previous approach some queries (that come first) will be affected regardless, while the queries submitted later on may be able to avoid scheduling on a slow processing nodes. With this approach all queries will get a latency hit (assuming a query doesn't have enough splits to be scheduled to balance it out)
Uneven splits (splits that take much longer to process than the other). In this scenario (assuming a randomized nature of scheduling) it seems unlikely that "jumbo" splits may over-congest a single node to the point that we would like to avoid it for scheduling as a single node processes rather high number of concurrent splits at once.
For short queries there shouldn't be a noticeable latency hit. IMO situation where splits get non-deterministically assigned to subset of nodes is worse because they lead to skewness on healthy nodes.
I think weighed scheduling deals with that to a degree. Split queue is no longer a count anymore, but also how heavy splits are. |
Previously, new stage splits were balanced against
all splits running on worker nodes (that includes
splits from other stages and queries). However,
this leads to non-deterministic scheduling where
in certain situations some stage with long running
splits might be fully scheduled on subset of worker
nodes. This PR makes UniformNodeSelector only
consider stage balancing on candidate nodes which
have sufficient split queue length.