-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement streaming versions of Dataframe.collect methods #789
Conversation
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea looks very nice 👍
datafusion/src/dataframe.rs
Outdated
/// # Ok(()) | ||
/// # } | ||
/// ``` | ||
async fn collect_stream(&self) -> Result<SendableRecordBatchStream>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we called this something like execute
rather than collect_stream
?
async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;
This would mirror the naming of ExecutionPlan::execute
and might make it clearer that collect
means collect into a Vec and execute
means get a stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I renamed these to execute_stream
and execute_stream_partitioned
|
||
/// Convert the logical plan represented by this DataFrame into a physical plan and | ||
/// execute it, collecting all resulting batches into memory while maintaining | ||
/// partitioning | ||
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> { | ||
let state = self.ctx_state.lock().unwrap().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably rewrite collect_partitioned
to be in terms of collect_stream_partitioned
:
collect(self.collect_stream_partitioned().await?)
or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've cleaned the code up and removed a fair bit of duplication now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a really nice change to me
Which issue does this PR close?
Closes #47.
Rationale for this change
In addition to the current
collect*
methods that load results into memory in aVec<RecordBatch>
this PR adds alternateexecute_stream*
methods that return streams instead so that results don't have to be loaded into memory before being processed.What changes are included in this PR?
New
execute_stream
andexecute_stream_partitioned
methods onDataFrame
.Are there any user-facing changes?
Yes, new DataFrame methods.