Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adaptive fragment sizes in Parquet writer #12627

Closed
wants to merge 10 commits into from
38 changes: 37 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
#include <io/utilities/config_utils.hpp>

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/get_value.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/linked_column.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/detail/dremel.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/table_device_view.cuh>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -85,6 +87,28 @@ parquet::Compression to_parquet_compression(compression_type compression)
}
}

size_type column_size(column_view const& column, rmm::cuda_stream_view stream)
{
if (column.num_children() == 0) { return size_of(column.type()) * column.size(); }

if (column.type().id() == type_id::STRING) {
auto scol = strings_column_view(column);
size_type colsize = cudf::detail::get_value<size_type>(scol.offsets(), column.size(), stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are sliced rows at the start of a column. I think we would need to subtract the first offset from this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm frankly amazed this code worked at all 😉 Yes, I'll subtract offsets[0].

return colsize;
} else if (column.type().id() == type_id::STRUCT) {
auto scol = structs_column_view(column);
size_type ret = 0;
for (int i = 0; i < scol.num_children(); i++)
ret += column_size(scol.get_sliced_child(i), stream);
return ret;
} else if (column.type().id() == type_id::LIST) {
auto lcol = lists_column_view(column);
return column_size(lcol.get_sliced_child(stream), stream);
}

return 0;
}

} // namespace

struct aggregate_writer_metadata {
Expand Down Expand Up @@ -1412,10 +1436,22 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
// iteratively reduce this value if the largest fragment exceeds the max page size limit (we
// ideally want the page size to be below 1MB so as to have enough pages to get good
// compression/decompression performance).
// If using the default fragment size, scale it up or down depending on the requested page size.
// If using the default fragment size, adapt fragment size so that page size guarantees are met.
if (max_page_fragment_size_ == cudf::io::default_max_page_fragment_size) {
max_page_fragment_size_ = (cudf::io::default_max_page_fragment_size * max_page_size_bytes) /
cudf::io::default_max_page_size_bytes;

if (table.num_rows() > 0) {
std::for_each(
table.begin(), table.end(), [this, num_rows = table.num_rows()](auto const& column) {
auto const avg_len = column_size(column, stream) / num_rows;

if (avg_len > 0) {
size_type frag_size = max_page_size_bytes / avg_len;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this too large? IIUC, max_page_size_bytes / avg_len is the average number of rows in each page. That means that any deviation in size between rows would cause us to overshoot the max page size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, avg will tend to overshoot. But with the deeply nested cases, it we overshoot anyway, now just by much less. 😅 I could try max row length perhaps, but that will be trickier to calculate than total size for the nested case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to suggest a better option, but anything small enough to get really precise page sizes in the largest column would significantly degrade performance for other columns.
Current implementation is actually a good compromise.

max_page_fragment_size_ = std::min(frag_size, max_page_fragment_size_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we use the fragment size based on the largest column.
Do you expect (perf) issues when we have a single large column and many small columns? The benchmarks show the best case scenario, where each table has columns of similar size.

When we talked about dynamic fragment size I envisioned per-column fragment size. That seems more optimal than the static size in all cases. I'm trying to figure out if we can claim that this PR is also always better than the (current) static option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will probably be bad in the single large column/many fixed length columns case. I'm interested to see what this does with @jbrennan333's user data, which seems more mixed than the test data he generated.

I really see this as a POC to demonstrate the value of changing up the fragment size. I agree having a per column fragment size would be best, but that's a heavier lift too. But maybe with these numbers it can be justified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you ran the "parquet_write_io_compression" group of benchmarks? That one has an even mix of all supported types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have, but my WS is shut down now. I can post tomorrow, but IIRC there was not a big difference, with this code being maybe a percent or two faster in most cases. I actually want to run that benchmark with ZSTD too, since that has issues with the run length=32 cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try this with the customer data. Have you already verified it with the test data I provided in #12613?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the parquet-tools inspect output for the gpu file:

############ file meta data ############
created_by:
num_columns: 7
num_rows: 17105
num_row_groups: 1
format_version: 1.0
serialized_size: 5922


############ Columns ############
format
hash
data
id
part
offset
relayTs

############ Column(format) ############
name: format
path: format
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 100%)

############ Column(hash) ############
name: hash
path: hash
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 47%)

############ Column(data) ############
name: data
path: data
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 85%)

############ Column(id) ############
name: id
path: origin.id
max_definition_level: 2
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(part) ############
name: part
path: origin.part
max_definition_level: 2
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(offset) ############
name: offset
path: origin.offset
max_definition_level: 2
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 89%)

############ Column(relayTs) ############
name: relayTs
path: relayTs
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 54%)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's somewhat disappointing. Can you also run parquet-tools dump -d -n on the gpu and cpu files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have those options in my version of parquet-tools. Here is the washed inspect --detail output for each.
cust-inspect-detail-cpu.txt
cust-inspect-detail-gpu.txt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't realize parquet-tools was an overloaded name. I was referring to the (now deprecated it seems) jar that comes with parquet-mr. Thanks for the extra details...combing through it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbrennan333 So the compression is less good for the 'data' column ("only" 85% vs 92%). That may be down to there being fewer pages, given the 1MB default page size for parquet-mr vs 512KB for libcudf. But it seems the fragment sizes are allowing zstd to compress things, so that's good news.

}
});
}
}

std::vector<int> num_frag_in_part;
Expand Down