-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make BallistaContext::collect streaming #535
Conversation
cc @andygrove |
struct WrappedStream { | ||
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>, | ||
schema: SchemaRef, | ||
} | ||
|
||
impl RecordBatchStream for WrappedStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
impl Stream for WrappedStream { | ||
type Item = ArrowResult<RecordBatch>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
self.stream.poll_next_unpin(cx) | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
self.stream.size_hint() | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was surprised I couldn't find anything like this. If there is a similar struct that I missed please do let me know and I'll use that one.
Also, since this is a pretty general wrapper, if you want me to move this to another place and make it public, I can do that too.
Codecov Report
@@ Coverage Diff @@
## master #535 +/- ##
==========================================
- Coverage 76.03% 76.02% -0.02%
==========================================
Files 157 157
Lines 26990 26994 +4
==========================================
Hits 20521 20521
- Misses 6469 6473 +4
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I ran integration tests locally and they passed so I am going to go ahead and merge this. |
Which issue does this PR close?
Closes #534.
Rationale for this change
The collect implementation in BallistaContext is bringing all of the contents in memory even though there is no need for it.
What changes are included in this PR?
Collect will now use streams all the way to avoid hogging memory
Are there any user-facing changes?
No