Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O Driver #1897

Merged
merged 8 commits into from
Jan 10, 2025
Merged

I/O Driver #1897

merged 8 commits into from
Jan 10, 2025

Conversation

gatesn
Copy link
Contributor

@gatesn gatesn commented Jan 10, 2025

Create a basic implementation of an I/O driver so we can hook the new layouts up to DataFusion which requires sendable record batch streams.

@gatesn gatesn merged commit 4a5b819 into develop Jan 10, 2025
21 checks passed
@gatesn gatesn deleted the ngates/io-driver branch January 10, 2025 20:41
@gatesn
Copy link
Contributor Author

gatesn commented Jan 10, 2025

Merged to unblock the DataFusion prototype, but will queue this up for a proper review if it turns out to be a useful component.

@gatesn gatesn requested a review from robert3005 January 10, 2025 20:45
Copy link
Member

@robert3005 robert3005 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just par_iter in the current implementation?

Comment on lines +78 to +86
// TODO(ngates): we should call buffered(n) on this stream so that is launches multiple
// splits to run in parallel. Currently we use block_on, so there's no point this being
// any higher than the size of the thread pool. If we switch to running LocalExecutor,
// then there may be some value in slightly over-subscribing.

// Set up an I/O driver that will make progress on 32 I/O requests at a time.
// TODO(ngates): we should probably have segments hold an Arc'd driver stream internally
// so that multiple scans can poll it, while still sharing the same global concurrency
// limit?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems these two comments are not in sync

Comment on lines +58 to +70
// Launch the scan task onto the thread pool.
self.thread_pool.spawn_fifo(move || {
let array_result =
range_scan.and_then(|range_scan| {
block_on(range_scan.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
}))
});
// Post the result back to the main thread
send.send(array_result)
.map_err(|_| vortex_err!("send failed, recv dropped"))
.vortex_expect("send_failed, recv dropped");
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not par iter on the thread_pool?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants