Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning support in parquet writer #9810

Merged
merged 38 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7ca6570
First working version of partitioned write
devavret Oct 22, 2021
80e03a4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 22, 2021
21dc54b
multiple sink API
devavret Nov 23, 2021
d947abd
partitions in write parquet API
devavret Nov 23, 2021
360bf87
Fix a bug in frag causing incorrect num rows
devavret Nov 24, 2021
942dd58
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Nov 24, 2021
d454507
Dict encoding changes. Dict kernels now use frags
devavret Nov 25, 2021
b2b44a6
API cleanups
devavret Nov 25, 2021
0b6d33f
Add a gtest and fix other tests by handling no partition case
devavret Nov 26, 2021
2beed73
Add a guard to protect from an exception being thrown in impl dtor wh…
devavret Nov 26, 2021
4e21e99
Add per-sink user_data in table_input_metadata
devavret Nov 29, 2021
e0d1f33
Cleanups in dict code and replace index translating while LIST loop w…
devavret Nov 30, 2021
54de724
fix the returned metadata blob on close
devavret Dec 1, 2021
aa45827
Revert to using meta ctor without user_data in pyx
devavret Dec 1, 2021
06b2643
Remove num_rows param and docs cleanup
devavret Dec 1, 2021
fffb41e
orc use table meta ctor with single user_data
devavret Dec 1, 2021
ecd3aa5
Small size_type cleanups
devavret Dec 1, 2021
950f505
Misc cleanups
devavret Dec 2, 2021
019ac25
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 7, 2021
1d55d1a
API changes
devavret Dec 7, 2021
387c2ac
Take user_data out of table_input_metadata
devavret Dec 8, 2021
9a77f5e
python changes for moving user_data
devavret Dec 8, 2021
dc157e1
Add checks for sizes of options in case of multiple sinks
devavret Dec 8, 2021
79639ee
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 8, 2021
8c2927d
bug in tests in init list for kv meta
devavret Dec 8, 2021
200d1b0
Prevent setting chunk paths if not specified
devavret Dec 9, 2021
d0de9a9
Make returned metadata blob optional
devavret Dec 9, 2021
d90245f
Make sink info members private
devavret Dec 9, 2021
5a17a4c
Revert "Make returned metadata blob optional"
devavret Dec 9, 2021
2e1c359
make source data members private
devavret Dec 10, 2021
f44a50b
Refactor aggregate_metadata
devavret Dec 10, 2021
be8c19a
revert tests that were changed for debugging
devavret Dec 10, 2021
b9b5c15
Add empty df tests, make review changes
devavret Dec 10, 2021
1e79453
Review changes: reduce line size by aliasing the variable I keep using
devavret Dec 13, 2021
be83945
source/sink_info memeber privatisation
devavret Dec 13, 2021
e537314
aggregate metadata privatisation
devavret Dec 13, 2021
91580b4
Merge branch 'branch-22.02' into parq-partitioned-write
devavret Dec 13, 2021
c53fea7
Fix a private member access
devavret Dec 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ class data_sink {
*/
static std::unique_ptr<data_sink> create(cudf::io::data_sink* const user_sink);

/**
* @brief Creates a vector of data sinks, one per element in the input vector.
*
* @param[in] args vector of parameters
*/
template <typename T>
static std::vector<std::unique_ptr<data_sink>> create(std::vector<T> const& args)
{
std::vector<std::unique_ptr<data_sink>> sinks;
sinks.reserve(args.size());
std::transform(args.cbegin(), args.cend(), std::back_inserter(sinks), [](auto const& arg) {
return data_sink::create(arg);
});
return sinks;
}

/**
* @brief Base class destructor
*/
Expand Down
15 changes: 9 additions & 6 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ class writer {
/**
* @brief Constructor for output to a file.
*
* @param sink The data sink to write the data to
* @param sinks The data sinks to write the data to
* @param options Settings for controlling writing behavior
* @param mode Option to write at once or in chunks
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit writer(std::unique_ptr<cudf::io::data_sink> sink,
explicit writer(std::vector<std::unique_ptr<data_sink>> sinks,
parquet_writer_options const& options,
SingleWriteMode mode,
rmm::cuda_stream_view stream,
Expand All @@ -104,15 +104,15 @@ class writer {
/**
* @brief Constructor for writer to handle chunked parquet options.
*
* @param sink The data sink to write the data to
* @param sinks The data sinks to write the data to
* @param options Settings for controlling writing behavior for chunked writer
* @param mode Option to write at once or in chunks
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @return A parquet-compatible blob that contains the data for all rowgroups in the list
*/
explicit writer(std::unique_ptr<cudf::io::data_sink> sink,
explicit writer(std::vector<std::unique_ptr<data_sink>> sinks,
chunked_parquet_writer_options const& options,
SingleWriteMode mode,
rmm::cuda_stream_view stream,
Expand All @@ -127,8 +127,10 @@ class writer {
* @brief Writes a single subtable as part of a larger parquet file/table write.
*
* @param[in] table The table information to be written
* @param[in] partitions Optional partitions to divide the table into. If specified, must be same
* size as number of sinks.
devavret marked this conversation as resolved.
Show resolved Hide resolved
*/
void write(table_view const& table);
void write(table_view const& table, std::vector<partition_info> const& partitions = {});

/**
* @brief Finishes the chunked/streamed write process.
Expand All @@ -138,7 +140,8 @@ class writer {
* @return A parquet-compatible blob that contains the data for all rowgroups in the list only if
* `column_chunks_file_path` is provided, else null.
*/
std::unique_ptr<std::vector<uint8_t>> close(std::string const& column_chunks_file_path = "");
std::unique_ptr<std::vector<uint8_t>> close(
std::vector<std::string> const& column_chunks_file_path = {});

/**
* @brief Merges multiple metadata blobs returned by write_all into a single metadata blob
Expand Down
59 changes: 59 additions & 0 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ class orc_writer_options {
table_view _table;
// Optional associated metadata
const table_input_metadata* _metadata = nullptr;
// Optional footer key_value_metadata
std::map<std::string, std::string> _user_data;

friend orc_writer_options_builder;

Expand Down Expand Up @@ -530,6 +532,11 @@ class orc_writer_options {
*/
table_input_metadata const* get_metadata() const { return _metadata; }

/**
* @brief Returns Key-Value footer metadata information.
*/
std::map<std::string, std::string> const& get_key_value_metadata() const { return _user_data; }

// Setters

/**
Expand Down Expand Up @@ -591,6 +598,16 @@ class orc_writer_options {
* @param meta Associated metadata.
*/
void set_metadata(table_input_metadata const* meta) { _metadata = meta; }

/**
* @brief Sets metadata.
*
* @param metadata Key-Value footer metadata
*/
void set_key_value_metadata(std::map<std::string, std::string> metadata)
{
_user_data = std::move(metadata);
}
};

class orc_writer_options_builder {
Expand Down Expand Up @@ -698,6 +715,18 @@ class orc_writer_options_builder {
return *this;
}

/**
* @brief Sets Key-Value footer metadata.
*
* @param metadata Key-Value footer metadata
* @return this for chaining.
*/
orc_writer_options_builder& key_value_metadata(std::map<std::string, std::string> metadata)
{
options._user_data = std::move(metadata);
return *this;
}

/**
* @brief move orc_writer_options member once it's built.
*/
Expand Down Expand Up @@ -753,6 +782,8 @@ class chunked_orc_writer_options {
size_type _row_index_stride = default_row_index_stride;
// Optional associated metadata
const table_input_metadata* _metadata = nullptr;
// Optional footer key_value_metadata
std::map<std::string, std::string> _user_data;

friend chunked_orc_writer_options_builder;

Expand Down Expand Up @@ -819,6 +850,11 @@ class chunked_orc_writer_options {
*/
table_input_metadata const* get_metadata() const { return _metadata; }

/**
* @brief Returns Key-Value footer metadata information.
*/
std::map<std::string, std::string> const& get_key_value_metadata() const { return _user_data; }

// Setters

/**
Expand Down Expand Up @@ -873,6 +909,16 @@ class chunked_orc_writer_options {
* @param meta Associated metadata.
*/
void metadata(table_input_metadata const* meta) { _metadata = meta; }

/**
* @brief Sets Key-Value footer metadata.
*
* @param metadata Key-Value footer metadata
*/
void set_key_value_metadata(std::map<std::string, std::string> metadata)
{
_user_data = std::move(metadata);
}
};

class chunked_orc_writer_options_builder {
Expand Down Expand Up @@ -965,6 +1011,19 @@ class chunked_orc_writer_options_builder {
return *this;
}

/**
* @brief Sets Key-Value footer metadata.
*
* @param metadata Key-Value footer metadata
* @return this for chaining.
*/
chunked_orc_writer_options_builder& key_value_metadata(
std::map<std::string, std::string> metadata)
{
options._user_data = std::move(metadata);
return *this;
}

/**
* @brief move chunked_orc_writer_options member once it's built.
*/
Expand Down
Loading