Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chunked Parquet reader #11867

Merged
merged 181 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 168 commits
Commits
Show all changes
181 commits
Select commit Hold shift + click to select a range
c2e8e87
Fix an issue where using num_rows and skip_rows on a parquet file con…
nvdbaranec Sep 23, 2022
f330431
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Sep 23, 2022
eadfd63
Fixed an issue with the tests: input columns cannot have unsanitary …
nvdbaranec Sep 27, 2022
c4de038
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Sep 27, 2022
222c9fe
Copy `parquet_reader_*` into `chunked_parquet_reader_*`
ttnghia Sep 30, 2022
f49cfed
Modify `chunked_parquet_reader_options`
ttnghia Sep 30, 2022
dd39804
Exploit inheritance to extend the options and options_builder classes
ttnghia Oct 4, 2022
81bc68f
Remove unnecessary variable
ttnghia Oct 5, 2022
f8126be
Misc
ttnghia Oct 5, 2022
0e7692c
Add docs
ttnghia Oct 5, 2022
9f9eeb0
PR feedback changes.
nvdbaranec Oct 5, 2022
9b3ea62
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Oct 5, 2022
d2e409a
Fixed some compile errors from merging.
nvdbaranec Oct 5, 2022
ed41ac1
Add `chunked_parquet_reader`
ttnghia Oct 5, 2022
be782f2
Add empty implementation
ttnghia Oct 5, 2022
7908b66
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 5, 2022
a7175c8
Add a destructor and `close`
ttnghia Oct 5, 2022
63a7bd6
Update docs
ttnghia Oct 6, 2022
16c12d9
Fix comment
ttnghia Oct 6, 2022
cd85385
Construct `chunked_parquet_reader`
ttnghia Oct 6, 2022
5944beb
Add comment
ttnghia Oct 6, 2022
7cfa72a
Rename function and implementing
ttnghia Oct 7, 2022
4696bd3
MISC
ttnghia Oct 7, 2022
99dc786
Bare bones implementation. Many types still not working.
nvdbaranec Oct 10, 2022
ad9c399
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 11, 2022
ecf225d
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 11, 2022
583a7ef
Add test
ttnghia Oct 11, 2022
b250d6f
Cleanup
ttnghia Oct 11, 2022
e7a9e3e
Modify docs
ttnghia Oct 11, 2022
811354a
Cleanup
ttnghia Oct 11, 2022
12ba72e
Add TODO
ttnghia Oct 11, 2022
45668ff
Add `read_intermediate_data`
ttnghia Oct 11, 2022
1bb8254
Use `read_intermediate_data`
ttnghia Oct 11, 2022
b1c44dd
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 12, 2022
56715ef
Fix bug
ttnghia Oct 12, 2022
a7e7e93
Simplify code
ttnghia Oct 12, 2022
8fe87b1
Implement `file_intermediate_data`
ttnghia Oct 12, 2022
464f4f9
Add `make_output`
ttnghia Oct 12, 2022
56756d6
Implement `read_chunk`
ttnghia Oct 12, 2022
3044ac5
Cleanup
ttnghia Oct 12, 2022
ffb8a19
Fix bug when `skip_rows` and `num_rows` are modified inside a called …
ttnghia Oct 12, 2022
baf3603
Fix comment
ttnghia Oct 13, 2022
8bdab44
Store preprocess data
ttnghia Oct 13, 2022
ec4abfb
Implement `chunked_reader` detail class
ttnghia Oct 13, 2022
cb1dea4
Refactoring
ttnghia Oct 13, 2022
a8dfd82
Rename structs
ttnghia Oct 13, 2022
7889e5a
Increment `current_read_chunk`
ttnghia Oct 13, 2022
63a6511
Call preprocessing in `read_chunk`
ttnghia Oct 13, 2022
c1269d1
Fix `has_next`
ttnghia Oct 14, 2022
95e6c1d
Refactoring
ttnghia Oct 14, 2022
bd7b510
Fix errors
ttnghia Oct 14, 2022
eb78526
Merge branch 'branch-22.12' into chunked_reader_gpu. Also: work to…
nvdbaranec Oct 17, 2022
66aeaf4
Change param
ttnghia Oct 18, 2022
1d700e3
Rename variables
ttnghia Oct 18, 2022
4af948b
Remove intermediate variables
ttnghia Oct 18, 2022
28cfc6f
Modify tests
ttnghia Oct 18, 2022
8135ed5
First pass of string support.
nvdbaranec Oct 18, 2022
fbeabfc
Fix bug
ttnghia Oct 18, 2022
df074e0
Remove debug print
ttnghia Oct 18, 2022
5653090
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 18, 2022
3071fd7
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 18, 2022
631eff1
Fix tests
ttnghia Oct 18, 2022
d1b4e4c
Fix chunk size limit
ttnghia Oct 18, 2022
3f2f8a4
Turn back to do preprocess once
ttnghia Oct 18, 2022
974e7ef
The read limit parameter is now no longer const but truely runtime pa…
ttnghia Oct 19, 2022
0be096b
Add new test file
ttnghia Oct 19, 2022
f7018fe
Reverse `parquet_test.cpp`
ttnghia Oct 19, 2022
81097eb
Modify `read` to add exception and preprocess once
ttnghia Oct 19, 2022
fcffac8
Rewrite tests
ttnghia Oct 19, 2022
43dd802
Store `decomp_page_data`
ttnghia Oct 19, 2022
eeec023
Rewrite tests
ttnghia Oct 19, 2022
14dfd3f
Simple test
ttnghia Oct 19, 2022
66e9f09
Store `raw_page_data`
ttnghia Oct 19, 2022
669b8cf
Cleanup test
ttnghia Oct 19, 2022
001c6c7
Fix empty output
ttnghia Oct 19, 2022
f50603a
Add `preprocess_file_and_columns`
ttnghia Oct 19, 2022
66976aa
Misc
ttnghia Oct 19, 2022
0b0040a
Fixed some incorrect logic in preprocess tep.
nvdbaranec Oct 19, 2022
467de78
Removed debug stuff. Added some comments.
nvdbaranec Oct 19, 2022
d68bf80
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 20, 2022
1888aa3
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 20, 2022
5861747
Change function
ttnghia Oct 20, 2022
721c052
Disable debug printing
ttnghia Oct 20, 2022
7cda8c2
Fixed an issue with non-first reads in the chunked reader. Made an a…
nvdbaranec Oct 20, 2022
6bc073d
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 20, 2022
2aef5cc
Fix off-by-one bug
ttnghia Oct 20, 2022
89ca1a6
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 20, 2022
5245b9b
Fix an issue related to aliased output pointers in the chunked read c…
nvdbaranec Oct 20, 2022
62283c7
Do not keep reference---copy object instead
ttnghia Oct 21, 2022
44424a2
Optimization: don't do any decoding or page size computation for pag…
nvdbaranec Oct 24, 2022
95df356
Merge branch 'branch-22.12' into chunked_reader_gpu
nvdbaranec Oct 24, 2022
445db9b
Fix build issue for spark-rapids-jni
nvdbaranec Oct 24, 2022
db56908
Cleanup: Remove `chunked_parquet_reader_options` and `chunked_parquet…
ttnghia Oct 24, 2022
ef5eaee
Move `preprocess_file` into `reader_preprocess.cu`
ttnghia Oct 24, 2022
d19260d
Move common implementation into `reader_impl_helpers.*`
ttnghia Oct 24, 2022
6569d62
Cleanup
ttnghia Oct 24, 2022
0a1e2c3
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 25, 2022
04e1320
More cleanup
ttnghia Oct 25, 2022
395413d
Rewrite docs for `parquet.hpp` files
ttnghia Oct 25, 2022
e3e19e8
Extract functions for `reader` and `chunked_reader`
ttnghia Oct 25, 2022
52339da
Fix issues with string length computation.
nvdbaranec Oct 25, 2022
f7e8694
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 25, 2022
83fa31a
Remove redundant changes
ttnghia Oct 25, 2022
02ccdec
Add simple structs test
ttnghia Oct 25, 2022
ee0ffad
Rewrite tests
ttnghia Oct 25, 2022
09a89e4
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 25, 2022
034a5b7
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 25, 2022
c149d64
Add lists test
ttnghia Oct 25, 2022
9335bb7
MISC
ttnghia Oct 25, 2022
3769fff
Cleanup comments
ttnghia Oct 25, 2022
dc9ef5c
Construct output table metadata just once
ttnghia Oct 25, 2022
0366b7a
Construct `_output_columns` just once
ttnghia Oct 26, 2022
ea2fe9c
Remove `options` member variable
ttnghia Oct 26, 2022
1804056
Make the chunked_read_limit a soft limit - if we can't find a split, …
nvdbaranec Oct 26, 2022
5671c36
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 26, 2022
3d6e13b
Add tests for structs of lists and lists of structs
ttnghia Oct 26, 2022
826c46f
Fixed an issue in split generation code causing indexing off the end …
nvdbaranec Oct 26, 2022
36c1972
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 26, 2022
d32c602
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 26, 2022
88ca034
Just reformat
ttnghia Oct 26, 2022
d76ee06
Change variable names in tests
ttnghia Oct 26, 2022
cf7b786
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 27, 2022
b806323
Optimization: store off global nesting sizes per page so that during…
nvdbaranec Oct 27, 2022
0c2178d
Adding doxygen, refactoring and cleaning up
ttnghia Oct 27, 2022
f759103
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 27, 2022
c2bf7f5
Fixed issues with list, and validity size calculations.
nvdbaranec Oct 27, 2022
e7e74c5
More refactoring
ttnghia Oct 27, 2022
ee9edbc
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 27, 2022
6fd3e90
Add more tests
ttnghia Oct 27, 2022
9dda980
Add test with empty data
ttnghia Oct 28, 2022
bb35f9f
Add tests
ttnghia Oct 28, 2022
59166cf
Rewrite null tests
ttnghia Oct 28, 2022
eb6e996
Add more extreme tests
ttnghia Oct 28, 2022
1c56794
Rewrite tests to generate input files just once
ttnghia Oct 28, 2022
4777419
Fix tests with structs of lists
ttnghia Oct 28, 2022
aedc37a
Handle nulls for more complex types
ttnghia Oct 28, 2022
c00eb3c
Fix another nulls handling bug for strings
ttnghia Oct 28, 2022
4d24f88
Simplify the null purging process
ttnghia Oct 28, 2022
b15bb39
Cleanup.
nvdbaranec Oct 31, 2022
7e38a56
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 31, 2022
321815d
Fleshed out list-of-structs and struct-of-lists tests.
nvdbaranec Oct 31, 2022
8cf95e8
Docs and cleanup.
nvdbaranec Oct 31, 2022
af35c4d
Update doxygen
ttnghia Oct 31, 2022
fb1bd73
Cleaning up
ttnghia Nov 2, 2022
842f9ea
Add doxygen
ttnghia Nov 2, 2022
34e3777
Clean up `reader_impl.hpp`
ttnghia Nov 2, 2022
40e463c
More cleanup
ttnghia Nov 2, 2022
7252b0d
Cleanup `allocate_nesting_info`
ttnghia Nov 2, 2022
696182c
Reformat
ttnghia Nov 2, 2022
4c353fd
Further cleanup
ttnghia Nov 2, 2022
31590cb
Rename `compute_chunk_read_info` into `preprocess_pages`
ttnghia Nov 3, 2022
d91c690
Re-adding an optimization that somehow got nuked during a merge.
nvdbaranec Nov 10, 2022
c5e73ce
Optimization: store off global nesting sizes per page so that during…
ttnghia Nov 10, 2022
0b31b95
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 10, 2022
c8e34ba
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 10, 2022
450e6d5
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 14, 2022
4fca0c0
Fix several warnings that show up in the spark-rapids-jni build.
nvdbaranec Nov 14, 2022
fd9280c
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 14, 2022
cedcc07
Several changes from PR review.
nvdbaranec Nov 15, 2022
16f78b6
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 15, 2022
73503bb
Fix typo
ttnghia Nov 15, 2022
e9905b8
Fix test
ttnghia Nov 15, 2022
4072a80
Address review comments
ttnghia Nov 15, 2022
95a97fb
Small optimization
ttnghia Nov 15, 2022
8390d4b
Implement `column_buffer::empty_like`
ttnghia Nov 15, 2022
b4b131f
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 15, 2022
5035bf4
Optimize unit tests: Only call `cudf::concatenate` once
ttnghia Nov 15, 2022
912d86c
Remove redundant check
ttnghia Nov 15, 2022
bb2e26e
Fix `cudaLaunchKernel` error in `DecodePageData`
ttnghia Nov 15, 2022
36c7ec2
Add assertion to make sure not to decode/parse empty page array
ttnghia Nov 15, 2022
7203c67
Address some review comments
ttnghia Nov 16, 2022
520448b
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 16, 2022
96eed8e
Fix `#endif`
ttnghia Nov 16, 2022
6697e3b
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 16, 2022
70f4fde
PR review changes. Updated some incorrect/incomplete function docs.
nvdbaranec Nov 17, 2022
4547483
Made the logic in the row_total_size functor much more readable.
nvdbaranec Nov 17, 2022
fe7d6d1
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 17, 2022
db21bc3
Fix the tests
ttnghia Nov 17, 2022
36043d8
Variable renaming for clarity.
nvdbaranec Nov 17, 2022
83f0703
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 17, 2022
3499bda
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,62 @@ class reader {
table_with_metadata read(parquet_reader_options const& options);
};

/**
* @brief The reader class that supports iterative reading of a given file.
*
* This class intentionally subclasses the `reader` class with private inheritance to hide the
* `reader::read()` API. As such, only chunked reading APIs are supported.
*/
class chunked_reader : private reader {
public:
/**
* @brief Constructor from a read size limit and an array of data sources with reader options.
*
* The typical usage should be similar to this:
* ```
* do {
* auto const chunk = reader.read_chunk();
* // Process chunk
* } while (reader.has_next());
*
* ```
*
* If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the
* whole file and return a table containing all rows.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Destructor explicitly-declared to avoid inlined in header.
*
* Since the declaration of the internal `_impl` object does not exist in this header, this
* destructor needs to be defined in a separate source file which can access to that object's
* declaration.
*/
~chunked_reader();

/**
* @copydoc cudf::io::chunked_parquet_reader::has_next
*/
[[nodiscard]] bool has_next() const;

/**
* @copydoc cudf::io::chunked_parquet_reader::read_chunk
*/
[[nodiscard]] table_with_metadata read_chunk() const;
Comment on lines +132 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought these could not be const because of internal state change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this in turn is just calling _impl->read_chunk(). Since _impl is const pointer we can make this const.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if read_chunk should be const but we can leave it for now.

};

/**
* @brief Class to write parquet dataset data into columns.
*/
Expand Down
68 changes: 68 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,74 @@ table_with_metadata read_parquet(
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief The chunked parquet reader class to read Parquet file iteratively in to a series of
* tables, chunk by chunk.
*
* This class is designed to address the reading issue when reading very large Parquet files such
* that the sizes of their column exceed the limit that can be stored in cudf column. By reading the
* file content by chunks using this class, each chunk is guaranteed to have its sizes stay within
* the given limit.
*/
class chunked_parquet_reader {
public:
/**
* @brief Default constructor, this should never be used.
*
* This is added just to satisfy cython.
*/
chunked_parquet_reader() = default;
vuule marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Constructor for chunked reader.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the
* output table for each reading.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param options The options used to read Parquet file
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor, destroying the internal reader instance.
*
* Since the declaration of the internal `reader` object does not exist in this header, this
* destructor needs to be defined in a separate source file which can access to that object's
* declaration.
*/
~chunked_parquet_reader();

/**
* @brief Check if there is any data in the given file has not yet read.
*
* @return A boolean value indicating if there is any data left to read
*/
[[nodiscard]] bool has_next() const;

/**
* @brief Read a chunk of rows in the given Parquet file.
*
* The sequence of returned tables, if concatenated by their order, guarantees to form a complete
* dataset as reading the entire given file at once.
*
* An empty table will be returned if the given file is empty, or all the data in the file has
* been read and returned by the previous calls.
*
* @return An output `cudf::table` along with its metadata
*/
[[nodiscard]] table_with_metadata read_chunk() const;

private:
std::unique_ptr<cudf::io::detail::parquet::chunked_reader> reader;
};

/** @} */ // end of group
/**
* @addtogroup io_writers
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,45 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const
return writer->close(options.get_column_chunks_file_paths());
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::~chunked_parquet_reader
*/
chunked_parquet_reader::~chunked_parquet_reader() = default;

/**
* @copydoc cudf::io::chunked_parquet_reader::has_next
*/
bool chunked_parquet_reader::has_next() const
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly.");
return reader->has_next();
}

/**
* @copydoc cudf::io::chunked_parquet_reader::read_chunk
*/
table_with_metadata chunked_parquet_reader::read_chunk() const
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly.");
return reader->read_chunk();
}

/**
* @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer
*/
Expand Down
Loading