Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async arrow parquet reader #1154

Merged
merged 7 commits into from
Feb 2, 2022
Merged

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jan 11, 2022

Which issue does this PR close?

Closes #111 .

Rationale for this change

See ticket, in particular I wanted to confirm that it is possible to create an async parquet reader without any major changes to the parquet crate. This seems to come up as a frequent ask from the community, and I think we could support it without any major churn.

What changes are included in this PR?

Adds a layer of indirection to array_reader to abstract it away from files, I think this change may stand on its own merits.

It then adds a ParquetRecordBatchStream which is a Stream that yields RecordBatch. Under the hood, this uses async to read row groups into memory and then feeds these into the non-async decoders.

The parquet docs describe the column chunk as the unit of IO, and so I think buffering compressed row groups in memory is not an impractical approach. It also avoids having to maintain sync and async version of all the decoders, readers, etc...

Are there any user-facing changes?

This adds Send + Sync to DataType, RowGroupReader, FileReader, ChunkReader.

It also adds Send constraints to the various std::io::Read constraints.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Jan 11, 2022
@@ -78,7 +78,6 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat

// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
let metadata_read: Box<dyn Read>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by cleanup - this dynamic dispatch isn't necessary any more

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does mean we have double dynamic dispatch, given these methods are called a couple of times per-file I'm inclined to consider this largely irrelevant

@codecov-commenter
Copy link

codecov-commenter commented Jan 11, 2022

Codecov Report

Merging #1154 (80c1978) into master (aa71aea) will decrease coverage by 0.06%.
The diff coverage is 21.93%.

❗ Current head 80c1978 differs from pull request most recent head 38e2225. Consider uploading reports for the commit 38e2225 to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1154      +/-   ##
==========================================
- Coverage   82.96%   82.90%   -0.07%     
==========================================
  Files         178      180       +2     
  Lines       51522    51969     +447     
==========================================
+ Hits        42744    43083     +339     
- Misses       8778     8886     +108     
Impacted Files Coverage Δ
parquet/src/arrow/async_reader.rs 0.00% <0.00%> (ø)
parquet/src/arrow/record_reader.rs 94.07% <ø> (ø)
parquet/src/arrow/record_reader/buffer.rs 92.42% <ø> (ø)
parquet/src/column/page.rs 98.68% <ø> (ø)
parquet/src/compression.rs 88.59% <ø> (ø)
parquet/src/data_type.rs 76.61% <ø> (ø)
parquet/src/encodings/decoding.rs 90.45% <ø> (ø)
parquet/src/file/reader.rs 75.47% <ø> (ø)
parquet/src/file/serialized_reader.rs 94.37% <ø> (ø)
parquet/src/util/test_common/page_util.rs 88.88% <ø> (ø)
... and 11 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aa71aea...38e2225. Read the comment docs.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tustvold -- this is really cool.

I suggest the following actions:

  1. Do a POC to use this async reader in DataFusion
  2. If that looks good, then fill out the tests for this

I'll try and find time later this week or this weekend to help if no one else beats me to it.

parquet/src/arrow/async_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader.rs Show resolved Hide resolved
parquet/src/arrow/async_reader.rs Outdated Show resolved Hide resolved
@alamb alamb changed the title POC: Async parquet reader POC: async arrow parquet reader Jan 11, 2022
@houqp
Copy link
Member

houqp commented Jan 12, 2022

Pretty cool demo @tustvold 👍

@yjshen
Copy link
Member

yjshen commented Jan 12, 2022

Exciting news, thanks @tustvold

@tustvold tustvold force-pushed the async-parquet-reader branch 2 times, most recently from da73b55 to dccb641 Compare January 28, 2022 20:11
Add Sync + Send bounds to parquet crate
@tustvold tustvold force-pushed the async-parquet-reader branch from dccb641 to 078b37c Compare January 28, 2022 22:13
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}

impl<T: Read> PageReader for SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As PageReader: Send it can only be implemented for types that are Send which is only the case for SerializedPageReader<T> if T: Send

@@ -43,8 +43,8 @@ pub trait Length {
/// The ChunkReader trait generates readers of chunks of a source.
/// For a file system reader, each chunk might contain a clone of File bounded on a given range.
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length {
type T: Read;
pub trait ChunkReader: Length + Send + Sync {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These traits need to be both Send + Sync as they are used through immutable references, e.g. Arc

@tustvold
Copy link
Contributor Author

With apache/datafusion#1617 I'm happy with the interface, so I'm marking this ready for review.

I'll work on getting some better coverage, e.g. the fuzz tests, over the coming days.

@tustvold tustvold marked this pull request as ready for review January 28, 2022 23:12
@github-actions github-actions bot added the arrow Changes to the arrow crate label Jan 28, 2022
@tustvold tustvold force-pushed the async-parquet-reader branch from 322e664 to 78f71ab Compare January 29, 2022 00:04
@tustvold tustvold changed the title POC: async arrow parquet reader Async arrow parquet reader Jan 29, 2022
@tustvold tustvold force-pushed the async-parquet-reader branch from 78f71ab to 92b7cb9 Compare January 29, 2022 00:39
@@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(Cell::new(&array_value_to_string(&column, row)?));
cells.push(Cell::new(&array_value_to_string(column, row)?));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Activating pretty_print in parquet appears to have made clippy find a load of new stuff in arrow 😅

@alamb alamb changed the title Async arrow parquet reader Add async arrow parquet reader Jan 29, 2022
@alamb alamb added the api-change Changes to the arrow API label Jan 29, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @tustvold -- thank you.

"give the people what they want!"

My largest potential concern is the introduction of Send and Sync. I would like to try this change against some other crate (datafusion or IOx perhaps) to make sure the new Send trait requirement doesn't cause undue challenges when upgrading.

As a follow on to this PR I think we (not you necessarily) should Take a look through the documentation and document the new async feature flag and add a doc example to this page: https://docs.rs/parquet/8.0.0/parquet/arrow/index.html
(will file a ticket to do so)

I also think this change (especially the newly added Send and Sync trait boundaries) deserves some broader attention, so I'll send a note to the mailing list too

parquet/Cargo.toml Show resolved Hide resolved
parquet/Cargo.toml Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader.rs Show resolved Hide resolved
parquet/src/arrow/async_reader.rs Show resolved Hide resolved
"+----------+-------------+-----------+",
],
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other tests that might be cool

  1. error cases (where projection is out of bounds, row group out of bounds).
  2. row group filter (as in read a multi-row group parquet file but only read one of the row groups)

@alamb
Copy link
Contributor

alamb commented Jan 29, 2022

Actually, don't we have to add async to the tests to ensure CI coverage?

@alamb
Copy link
Contributor

alamb commented Jan 29, 2022

Actually, I see apache/datafusion#1617 demonstrates what impacts this has on DataFusion, which seems just fine 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/arrow-rs/runs/5004473864?check_suite_focus=true
Screen Shot 2022-02-01 at 6 32 00 AM

Unless anyone else has any comments, I'll plan to merge this tomorrow

@alamb
Copy link
Contributor

alamb commented Feb 1, 2022

Added an example in #1253

@alamb alamb merged commit 91d12ec into apache:master Feb 2, 2022
@alamb
Copy link
Contributor

alamb commented Feb 2, 2022

Thanks again @tustvold 👍

@alamb alamb added enhancement Any new improvement worthy of a entry in the changelog and removed api-change Changes to the arrow API labels Feb 3, 2022
@alamb
Copy link
Contributor

alamb commented Feb 3, 2022

tracking API change in #1264 (for changelog)

@zeevm
Copy link
Contributor

zeevm commented Feb 5, 2022

I see a few issues with this.

First, the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.

Second, a major premise of Parquet is "read only what you need", where what you need is usually dictated by some query engine, so continuously downloading in the background for data the client may doesn't even want or need doesn't seem right, especially as the cost is complicating all existing client by the added "Send" constraint.

@tustvold
Copy link
Contributor Author

tustvold commented Feb 5, 2022

the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.

I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data ParquetRecordBatchStream fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see here, let alone writing or doing anything with them 😅

so continuously downloading in the background for data the client

This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing AsyncRead and AsyncSeek. This has been a frequent ask within Datafusion and apache/datafusion#1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actual fetching of data from object storage, save for requesting the necessary byte ranges from the reader implementation, limited by any projections, filters, or row group selections pushed down by the query engine.

complicating all existing client by the added "Send" constraint.

Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are Send. Is there a particular one causing an issue, as we could potentially feature gate it behind the async feature flag?

@alamb
Copy link
Contributor

alamb commented Feb 7, 2022

Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are Send. Is there a particular one causing an issue, as we could potentially feature gate it behind the async feature flag?

I am also interested in what issues (if any) adding the Send constraint has / will cause.

@zeevm if you have some time and are willing to help make the async parquet reader more sophisticated in terms of reading only what is needed, we would love to welcome your contributions ❤️ -- we are only just beginning to improve in this area.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Provide an async ParquetReader for arrow
6 participants