-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: handle larger z-order jobs with streaming output and spilling #1461
Conversation
rust/src/operations/optimize.rs
Outdated
use datafusion::execution::memory_pool::FairSpillPool; | ||
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; | ||
// TODO: usethreads | ||
use datafusion::prelude::{col, ParquetReadOptions, SessionConfig}; | ||
|
||
// TODO: make this configurable. | ||
// TODO: push this up and share between bins or maybe an overall runtime. | ||
let memory_pool = FairSpillPool::new(8 * 1024 * 1024 * 1024); | ||
let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); | ||
let runtime = Arc::new(RuntimeEnv::new(config)?); | ||
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little stumped here. Where should this configuration live?
It makes sense to have the memory pool as a global config, IMO. But the object store should be local to here, since it's specific to this particular table's root.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some similar question a little while back. For a while I thought the DeltaOps
struct could be a place where session state is stored, but the way we use that right now, it's even shorter lived then the DeltaTable
.
I guess the only thing that "survives" is the DeltaTableState
. Maybe a way to go is to have that as an optional field on theDeltaTable
. Then once the first operation is performed, we populate that field in the returned table and re-use if for subsequent operations. This may also give us a way to capture user configuration for such things that would take a similar route through the code as the storage options do right now.
51d6ba4
to
a48a071
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
As you mentioned in one of the comments, going forward we should find a more central place to handle the configuration, and also harmonize how we register object stores on the datafusion runtime.
Elsewhere, I have been experimenting with more formally integrating with the datafusion query planning, in the hopes that we may adopt parts of this here.
a48a071
to
f3e37dd
Compare
Description
Fixes the base implementation so that is doesn't materialize the entire result in one record batch. It will still require materializing the full input for each partition in memory. This is mostly a problem for unpartitioned table, since that means materializing the entire table in memory.
Adds a new datafusion-based implementation enabled by the
datafusion
feature. In theory, this should support spilling to disk.Related Issue(s)
For example:
Documentation