Skip to content

Commit

Permalink
Implement chunked Parquet reader (#11867)
Browse files Browse the repository at this point in the history
This adds chunked Parquet reader, which can perform chunked reading for accessing files by an iterative manner. Instead of reading the input file all at once, we can read it chunk by chunk, each chunk can be limited to be small enough to not exceed the cudf internal limit (2GB/2 billions rows):
```
auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts);
do {
    auto const chunk = reader.read_chunk();
    // Process chunk
} while (reader.has_next());
```

Authors:
  - Nghia Truong (https://github.com/ttnghia)
  - https://github.com/nvdbaranec

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #11867
  • Loading branch information
ttnghia authored Nov 18, 2022
1 parent db0d045 commit 3fb09d1
Show file tree
Hide file tree
Showing 14 changed files with 2,320 additions and 351 deletions.
56 changes: 56 additions & 0 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,62 @@ class reader {
table_with_metadata read(parquet_reader_options const& options);
};

/**
* @brief The reader class that supports iterative reading of a given file.
*
* This class intentionally subclasses the `reader` class with private inheritance to hide the
* `reader::read()` API. As such, only chunked reading APIs are supported.
*/
class chunked_reader : private reader {
public:
/**
* @brief Constructor from a read size limit and an array of data sources with reader options.
*
* The typical usage should be similar to this:
* ```
* do {
* auto const chunk = reader.read_chunk();
* // Process chunk
* } while (reader.has_next());
*
* ```
*
* If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the
* whole file and return a table containing all rows.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Destructor explicitly-declared to avoid inlined in header.
*
* Since the declaration of the internal `_impl` object does not exist in this header, this
* destructor needs to be defined in a separate source file which can access to that object's
* declaration.
*/
~chunked_reader();

/**
* @copydoc cudf::io::chunked_parquet_reader::has_next
*/
[[nodiscard]] bool has_next() const;

/**
* @copydoc cudf::io::chunked_parquet_reader::read_chunk
*/
[[nodiscard]] table_with_metadata read_chunk() const;
};

/**
* @brief Class to write parquet dataset data into columns.
*/
Expand Down
68 changes: 68 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,74 @@ table_with_metadata read_parquet(
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief The chunked parquet reader class to read Parquet file iteratively in to a series of
* tables, chunk by chunk.
*
* This class is designed to address the reading issue when reading very large Parquet files such
* that the sizes of their column exceed the limit that can be stored in cudf column. By reading the
* file content by chunks using this class, each chunk is guaranteed to have its sizes stay within
* the given limit.
*/
class chunked_parquet_reader {
public:
/**
* @brief Default constructor, this should never be used.
*
* This is added just to satisfy cython.
*/
chunked_parquet_reader() = default;

/**
* @brief Constructor for chunked reader.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the
* output table for each reading.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param options The options used to read Parquet file
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor, destroying the internal reader instance.
*
* Since the declaration of the internal `reader` object does not exist in this header, this
* destructor needs to be defined in a separate source file which can access to that object's
* declaration.
*/
~chunked_parquet_reader();

/**
* @brief Check if there is any data in the given file has not yet read.
*
* @return A boolean value indicating if there is any data left to read
*/
[[nodiscard]] bool has_next() const;

/**
* @brief Read a chunk of rows in the given Parquet file.
*
* The sequence of returned tables, if concatenated by their order, guarantees to form a complete
* dataset as reading the entire given file at once.
*
* An empty table will be returned if the given file is empty, or all the data in the file has
* been read and returned by the previous calls.
*
* @return An output `cudf::table` along with its metadata
*/
[[nodiscard]] table_with_metadata read_chunk() const;

private:
std::unique_ptr<cudf::io::detail::parquet::chunked_reader> reader;
};

/** @} */ // end of group
/**
* @addtogroup io_writers
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,45 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const
return writer->close(options.get_column_chunks_file_paths());
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::~chunked_parquet_reader
*/
chunked_parquet_reader::~chunked_parquet_reader() = default;

/**
* @copydoc cudf::io::chunked_parquet_reader::has_next
*/
bool chunked_parquet_reader::has_next() const
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly.");
return reader->has_next();
}

/**
* @copydoc cudf::io::chunked_parquet_reader::read_chunk
*/
table_with_metadata chunked_parquet_reader::read_chunk() const
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader != nullptr, "Reader has not been constructed properly.");
return reader->read_chunk();
}

/**
* @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer
*/
Expand Down
Loading

0 comments on commit 3fb09d1

Please sign in to comment.