Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epic] Optionally Limit memory used by DataFusion plan #587

Closed
7 of 8 tasks
alamb opened this issue Jun 18, 2021 · 19 comments · Fixed by #1526
Closed
7 of 8 tasks

[Epic] Optionally Limit memory used by DataFusion plan #587

alamb opened this issue Jun 18, 2021 · 19 comments · Fixed by #1526
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jun 18, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The basic challenge is that as of today, DataFusion can use an unbounded amount of memory for running a plan and it is neither possible to calculate the memory before hand nor limit the use.

If DataFusion processes individual partitions that are larger than the available memory system memory, right now it will keep allocating memory from the system until it is killed by the OS or container system.

Also, when running multiple datafusion plans in the same process, each will consume memory without limit where it may be desirable to reserve / cap memory usage by any individual plan to ensure the plans don't together exceed the system memory budget

Thus, it would be nice if we could give DataFusion's plans a memory budget which they then stayed under

Describe the solution you'd like

  1. Add an option to ExecutionConfig that has a “total plan memory budget”
  2. Add logic to each node that requires a memory buffer to ensure it stays under the limit.

The operators that can use large amounts of memory today are:

  1. Sort
  2. Join
  3. GroupByHash

There are many potential ways to ensure the limit is respected:

  1. (Simplest) error if the budget is exceeded
  2. (more complex): employ algorithms that can use secondary storage (e.g. temp files) like sort that spills multiple round of partial sorted results and give a final merge phase for the partition global ordering

Describe alternatives you've considered
There are some interesting tradeoffs between “up front allocation” dividing memory up across all operators that would need it and a more dynamic approach.

This is likely something that will require some major efforts over many different issues -- I suggest we use this issue to implement a simple "error if over limit" strategy and then work on more sophisticated strategies subsequently

Progress tracking

Added Jan 2022:

Remaining Work

@edrevo
Copy link
Contributor

edrevo commented Jun 18, 2021

I would add Repartition as another operation that might use a bunch of memory.

@andygrove
Copy link
Member

We should also discuss creating a scheduler in DataFusion (see #64) since it is related to this work. Rather than try and run all the things at once, it would be better to schedule work based on the available resources (cores / memory). We would still need the ability to track/limit memory use within operators but the scheduler could be aware of this and only allocate tasks if there is memory budget available.

@alamb
Copy link
Contributor Author

alamb commented Aug 16, 2021

I filed #898 for tracking memory used by a plan

@alamb
Copy link
Contributor Author

alamb commented Aug 16, 2021

#899 for tracking memory used by individual operators

@yjshen
Copy link
Member

yjshen commented Nov 10, 2021

I created a proposal trying to fix this. Please refer to https://docs.google.com/document/d/1BT5HH-2sKq-Jxo51PNE6l9NNd_F-FyyYcyC3SKTnkIA/edit# for the whole proposal.

@alamb
Copy link
Contributor Author

alamb commented Jan 15, 2022

I have started added a "Progress Tracking" list to the description of this ticket. Please update it with additional items as you discover them.

@liukun4515
Copy link
Contributor

@alamb Maybe we should take the join operation into this track.

@alamb
Copy link
Contributor Author

alamb commented Jan 15, 2022

@alamb Maybe we should take the join operation into this track.

It is a good idea @liukun4515 -- I ran out of ambition while typing up Sort and Grouping. I'll try and write up some thoughts on joins later

@liukun4515
Copy link
Contributor

@alamb Maybe we should take the join operation into this track.

It is a good idea @liukun4515 -- I ran out of ambition while typing up Sort and Grouping. I'll try and write up some thoughts on joins later

I'm not familiar with external operations, I will go through other databases to learn it.

@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2022

I wrote up some thoughts about externalized joins on #1599

@alamb
Copy link
Contributor Author

alamb commented Apr 7, 2022

Hi @hzh0425 -- There is no estimated completion time I know of.

Thanks to @yjshen there is a way to limit the memory used in Sort. The major other operators that need to be memory limited that I now of are Group and Join -- here is hoping someone can contribute time to help in that endeavor.

@alamb
Copy link
Contributor Author

alamb commented Oct 24, 2022

Added #3941 for the project of "error if memory limits are exceeded"

@alamb
Copy link
Contributor Author

alamb commented Nov 28, 2022

Update here is that we are close to having enforced memory limits for grouping and sorting (see #3941 for more details).

We also have ideas on how to improve the grouping code that should make supporting spilling grouping easier to implement -- see #2723 (comment)

@alamb alamb changed the title Optionally Limit memory used by DataFusion plan EPIC Optionally Limit memory used by DataFusion plan Nov 28, 2022
@alamb alamb changed the title EPIC Optionally Limit memory used by DataFusion plan [Epic] Optionally Limit memory used by DataFusion plan Mar 5, 2023
@alamb
Copy link
Contributor Author

alamb commented Mar 5, 2023

Update: we have memory limited Grouping and are now working on on joins. @korowa has added limiting for Cross Joins recently #5339 🎉

@alamb
Copy link
Contributor Author

alamb commented Jun 11, 2023

I think this is largely complete and we can track any missing items as smaller follow on PRs

@alamb alamb closed this as completed Jun 11, 2023
@SteveLauC
Copy link
Contributor

Hi, from this thread, it seems that DataFusion can ONLY limit the memory used by those resource-heavy operators, can it limit the memory used by the underlying FileScan operators, like ParquetExec?

Let me give a demo with the following code:

use datafusion::execution::memory_pool::{GreedyMemoryPool, MemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use std::sync::Arc;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mem_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(0)); // limit memory usage to 0
    let rt_cfg = RuntimeConfig::new().with_memory_pool(mem_pool);
    let rt = RuntimeEnv::new(rt_cfg).unwrap();

    let session_cfg = SessionConfig::new();
    let ctx = SessionContext::new_with_config_rt(session_cfg, Arc::new(rt));

    ctx.register_parquet("foo", "foo.parquet", ParquetReadOptions::default())
        .await
        .unwrap();
    let df = ctx.sql("select * from foo").await.unwrap();
    df.show().await.unwrap();
}

Even though we limit the available memory to 0, the query exeuctes without any issue:

$ ls -l foo.parquet
.rw-r--r-- 484 steve 17 Jan 17:08 foo.parquet

$ cargo r -q
+-----+
| foo |
+-----+
| bar |
| bar |
| bar |
| bar |
| bar |
| bar |
| bar |
| bar |
+-----+

@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2024

Hi, from this thread, it seems that DataFusion can ONLY limit the memory used by those resource-heavy operators, can it limit the memory used by the underlying FileScan operators, like ParquetExec?

That is correct, though it is concievable that we could update ParquetExec to register its memory use with the memory manager

In general DataFusion takes a pragmatic approach to memory management where the intermediate memory used as data streams through the system is not accounted (assumed to be "small") and the largest consumers of memory register their use

This trades off the additional complexity of memory tracking and management with limiting resource usage

There is some small amount more information on https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/trait.MemoryPool.html

@SteveLauC
Copy link
Contributor

Thanks for your explanation!

@alamb
Copy link
Contributor Author

alamb commented Jan 23, 2024

Thanks for your explanation!

No worries -- thanks for the good question. I filed #8966 to try and capture some of this rationale in the documentation for future readers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
6 participants