Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix V2 Parquet page alignment for use with zStandard compression #14841

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 82 additions & 47 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,27 @@ __device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint64_t* buffer,
return packer.flush();
}

/**
* @brief Sets `s->cur` to point to the start of encoded page data.
*
* For V1 headers, this will be immediately after the repetition and definition level data. For V2,
* it will be at the next properly aligned location after the level data. The padding in V2 is
* needed for compressors that require aligned input.
*/
template <typename state_type>
inline void __device__ set_page_data_start(state_type* s)
{
s->cur = s->page.page_data + s->page.max_hdr_size;
switch (s->page.page_type) {
case PageType::DATA_PAGE:
s->cur += s->page.level_bytes();
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
break;
case PageType::DATA_PAGE_V2: s->cur += s->page.max_lvl_size; break;
}
}

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -594,8 +615,13 @@ CUDF_KERNEL void __launch_bounds__(128)
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.hdr_size = 0;
page_g.def_lvl_bytes = 0;
page_g.rep_lvl_bytes = 0;
page_g.max_lvl_size = 0;
page_g.comp_data_size = 0;
page_g.max_hdr_size = MAX_V1_HDR_SIZE;
page_g.max_data_size = ck_g.uniq_data_size;
page_g.data_size = ck_g.uniq_data_size;
page_g.start_row = cur_row;
page_g.num_rows = ck_g.num_dict_entries;
page_g.num_leaf_values = ck_g.num_dict_entries;
Expand Down Expand Up @@ -689,12 +715,17 @@ CUDF_KERNEL void __launch_bounds__(128)
page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page);
}
if (!t) {
page_g.num_fragments = fragments_in_chunk - page_start;
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.page_type = data_page_type;
page_g.hdr_size = 0;
page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics
page_g.num_fragments = fragments_in_chunk - page_start;
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.page_type = data_page_type;
page_g.hdr_size = 0;
page_g.def_lvl_bytes = 0;
page_g.rep_lvl_bytes = 0;
page_g.max_lvl_size = 0;
page_g.data_size = 0;
page_g.comp_data_size = 0;
page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics
if (ck_g.stats) {
uint32_t stats_hdr_len = 16;
if (col_g.stats_dtype == dtype_string || col_g.stats_dtype == dtype_byte_array) {
Expand All @@ -716,13 +747,19 @@ CUDF_KERNEL void __launch_bounds__(128)
page_g.num_valid = num_valid;
auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page);
auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page);
if (write_v2_headers) {
page_g.max_lvl_size =
util::round_up_unsafe(def_level_size + rep_level_size, page_align);
}
// get a different bound if using delta encoding
if (is_use_delta) {
auto const delta_len =
delta_data_len(physical_type, type_id, page_g.num_leaf_values, page_size);
page_size = max(page_size, delta_len);
}
auto const max_data_size = page_size + def_level_size + rep_level_size + rle_pad;
auto const max_data_size =
page_size + rle_pad +
(write_v2_headers ? page_g.max_lvl_size : def_level_size + rep_level_size);
// page size must fit in 32-bit signed integer
if (max_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page size exceeds maximum for i32");
Expand All @@ -739,7 +776,9 @@ CUDF_KERNEL void __launch_bounds__(128)
page_offset +=
util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align);
if (not comp_page_sizes.empty()) {
comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages];
// V2 does not include level data in compressed size estimate
comp_page_offset += page_g.max_hdr_size + page_g.max_lvl_size +
comp_page_sizes[ck_g.first_page + num_pages];
}
page_headers_size += page_g.max_hdr_size;
max_page_data_size = max(max_page_data_size, page_g.max_data_size);
Expand Down Expand Up @@ -774,8 +813,10 @@ CUDF_KERNEL void __launch_bounds__(128)
}
pages[ck_g.first_page + num_pages] = page_g;
}
// page_sizes should be the number of bytes to be compressed, so don't include level
// data for V2.
if (not page_sizes.empty()) {
page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size;
page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size - page_g.max_lvl_size;
}
if (page_grstats) { page_grstats[ck_g.first_page + num_pages] = pagestats_g; }
}
Expand Down Expand Up @@ -1429,10 +1470,6 @@ __device__ void finish_page_encode(state_buf* s,
return thrust::reduce(thrust::seq, hist_start, hist_end, 0U);
};

// V2 does not compress rep and def level data
size_t const skip_comp_size =
write_v2_headers ? s->page.def_lvl_bytes + s->page.rep_lvl_bytes : 0;

// this will be true if max_rep > 0 (i.e. there are lists)
if (s->page.rep_histogram != nullptr) {
// for repetition we get hist[0] from num_rows, and can derive hist[max_rep_level]
Expand Down Expand Up @@ -1489,10 +1526,17 @@ __device__ void finish_page_encode(state_buf* s,
// FIXME(ets): this needs to do error propagation back to the host
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
if (s->page.is_v2()) {
auto const d_base = base + s->page.max_lvl_size;
s->page.data_size = static_cast<uint32_t>(end_ptr - d_base) + s->page.level_bytes();
} else {
s->page.data_size = actual_data_size;
}
if (not comp_in.empty()) {
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size,
auto const c_base = base + s->page.max_lvl_size;
auto const bytes_to_compress = static_cast<uint32_t>(end_ptr - c_base);
comp_in[blockIdx.x] = {c_base, bytes_to_compress};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + s->page.max_lvl_size,
0}; // size is unused
}
pages[blockIdx.x] = s->page;
Expand All @@ -1503,10 +1547,10 @@ __device__ void finish_page_encode(state_buf* s,
}

// copy uncompressed bytes over
if (skip_comp_size != 0 && not comp_in.empty()) {
if (s->page.is_v2() and not comp_in.empty()) {
uint8_t* const src = s->page.page_data + s->page.max_hdr_size;
uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size;
for (int i = t; i < skip_comp_size; i += block_size) {
for (int i = t; i < s->page.level_bytes(); i += block_size) {
dst[i] = src[i];
}
}
Expand Down Expand Up @@ -1536,13 +1580,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
// if V1 data page, need space for the RLE length fields
if (s->page.page_type == PageType::DATA_PAGE) {
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
}
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -1771,13 +1809,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
// if V1 data page, need space for the RLE length fields
if (s->page.page_type == PageType::DATA_PAGE) {
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
}
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -1908,8 +1940,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -2017,8 +2048,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -2142,11 +2172,10 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size)
auto const num_pages = ck_g[warp_id].num_pages;
for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) {
auto const& curr_page = ck_g[warp_id].pages[page_id];
auto const page_data_size = curr_page.max_data_size;
auto const is_v2 = curr_page.page_type == PageType::DATA_PAGE_V2;
auto const lvl_bytes = is_v2 ? curr_page.def_lvl_bytes + curr_page.rep_lvl_bytes : 0;
auto const page_data_size = curr_page.data_size;
uncompressed_data_size += page_data_size;
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0;
compressed_data_size += comp_res->bytes_written + lvl_bytes;
if (comp_res->status != compression_status::SUCCESS) {
atomicOr(&compression_error[warp_id], 1);
Expand Down Expand Up @@ -2614,14 +2643,13 @@ CUDF_KERNEL void __launch_bounds__(128)
EncodeStatistics(hdr_start, &chunk_stats[page_g.chunk_id], col_g.stats_dtype, scratch);
page_g.chunk->ck_stat_size = static_cast<uint32_t>(hdr_end - hdr_start);
}
uncompressed_page_size = page_g.max_data_size;
uncompressed_page_size = page_g.data_size;
if (ck_g.is_compressed) {
auto const is_v2 = page_g.page_type == PageType::DATA_PAGE_V2;
auto const lvl_bytes = is_v2 ? page_g.def_lvl_bytes + page_g.rep_lvl_bytes : 0;
auto const lvl_bytes = page_g.is_v2() ? page_g.level_bytes() : 0;
hdr_start = page_g.compressed_data;
compressed_page_size =
static_cast<uint32_t>(comp_results[blockIdx.x].bytes_written) + lvl_bytes;
page_g.max_data_size = compressed_page_size;
page_g.comp_data_size = compressed_page_size;
} else {
hdr_start = page_g.page_data;
compressed_page_size = uncompressed_page_size;
Expand Down Expand Up @@ -2708,19 +2736,26 @@ CUDF_KERNEL void __launch_bounds__(1024)
if (t == 0) { page_g = first_page[page]; }
__syncthreads();

src = (ck_g.is_compressed) ? page_g.compressed_data : page_g.page_data;
src = ck_g.is_compressed ? page_g.compressed_data : page_g.page_data;
// Copy page header
hdr_len = page_g.hdr_size;
memcpy_block<1024, true>(dst, src, hdr_len, t);
src += page_g.max_hdr_size;
dst += hdr_len;
// Copy page data
uncompressed_size += hdr_len;
data_len = page_g.max_data_size;
data_len = ck_g.is_compressed ? page_g.comp_data_size : page_g.data_size;
// Copy page data. For V2, the level data and page data are disjoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disjoint because of the alignment padding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I suppose there will be cases where they are actually contiguous, but I don't know how worth it it is to test for that.

if (page_g.is_v2()) {
auto const lvl_len = page_g.level_bytes();
memcpy_block<1024, true>(dst, src, lvl_len, t);
src += page_g.max_lvl_size;
dst += lvl_len;
data_len -= lvl_len;
}
memcpy_block<1024, true>(dst, src, data_len, t);
dst += data_len;
__syncthreads();
if (!t && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
if (t == 0 && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
}
if (t == 0) {
chunks[blockIdx.x].bfr_size = uncompressed_size;
Expand Down
43 changes: 27 additions & 16 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,30 +560,41 @@ struct EncColumnChunk {
* @brief Struct describing an encoder data page
*/
struct EncPage {
uint8_t* page_data; //!< Ptr to uncompressed page
uint8_t* compressed_data; //!< Ptr to compressed page
uint16_t num_fragments; //!< Number of fragments in page
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
EncColumnChunk* chunk; //!< Chunk that this page belongs to
// all pointers at the top to keep things properly aligned
uint8_t* page_data; //!< Ptr to uncompressed page
uint8_t* compressed_data; //!< Ptr to compressed page
EncColumnChunk* chunk; //!< Chunk that this page belongs to
compression_result* comp_res; //!< Ptr to compression result
uint32_t* def_histogram; //!< Histogram of counts for each definition level
uint32_t* rep_histogram; //!< Histogram of counts for each repetition level
// put this here in case it's ever made 64-bit
encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run
// the rest can be 4 byte aligned
uint32_t chunk_id; //!< Index in chunk array
uint32_t hdr_size; //!< Size of page header
uint32_t hdr_size; //!< Actual size of encoded page header
uint32_t max_hdr_size; //!< Maximum size of page header
uint32_t max_data_size; //!< Maximum size of coded page data (excluding header)
uint32_t max_data_size; //!< Maximum size of encoded page data (excluding header)
uint32_t data_size; //!< Actual size of encoded page data (includes level data)
uint32_t comp_data_size; //!< Actual size of compressed page data
uint32_t start_row; //!< First row of page
uint32_t num_rows; //!< Rows in page
uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types
uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in
//!< non-leaf levels
uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data (V2 only)
uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data (V2 only)
compression_result* comp_res; //!< Ptr to compression result
uint32_t num_nulls; //!< Number of null values (V2 only) (down here for alignment)
encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run
uint32_t* def_histogram; //!< Histogram of counts for each definition level
uint32_t* rep_histogram; //!< Histogram of counts for each repetition level
uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only)
uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data
uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data
uint32_t max_lvl_size; //!< Maximum size of level data (V2 only, 0 for V1)
uint32_t num_nulls; //!< Number of null values
uint32_t num_valid; //!< Number of valid leaf values
uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only)
// enums and smaller stuff down here
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
uint16_t num_fragments; //!< Number of fragments in page

constexpr bool is_v2() const { return page_type == PageType::DATA_PAGE_V2; }

constexpr auto level_bytes() const { return def_lvl_bytes + rep_lvl_bytes; }
};

/**
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2220,10 +2220,6 @@ writer::impl::~impl() { close(); }

void writer::impl::init_state()
{
// See issue #14781. Can remove this check once that is fixed.
CUDF_EXPECTS(not(_write_v2_headers and _compression == Compression::ZSTD),
"V2 page headers cannot be used with ZSTD compression");

_current_chunk_offset.resize(_out_sink.size());
// Write file header
file_header_s fhdr;
Expand Down Expand Up @@ -2405,7 +2401,8 @@ void writer::impl::write_parquet_data_to_sink(
// skip dict pages
if (enc_page.page_type == PageType::DICTIONARY_PAGE) { continue; }

int32_t this_page_size = enc_page.hdr_size + enc_page.max_data_size;
int32_t const this_page_size =
enc_page.hdr_size + (ck.is_compressed ? enc_page.comp_data_size : enc_page.data_size);
// first_row_idx is relative to start of row group
PageLocation loc{curr_pg_offset, this_page_size, enc_page.start_row - ck.start_row};
if (is_byte_arr) { var_bytes.push_back(enc_page.var_bytes_size); }
Expand Down
5 changes: 4 additions & 1 deletion cpp/tests/io/parquet_v2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ INSTANTIATE_TEST_SUITE_P(ParquetV2ReadWriteTest,

TEST_P(ParquetV2Test, MultiColumn)
{
constexpr auto num_rows = 50000;
constexpr auto num_rows = 50'000;
auto const is_v2 = GetParam();

// auto col0_data = random_values<bool>(num_rows);
Expand Down Expand Up @@ -84,6 +84,7 @@ TEST_P(ParquetV2Test, MultiColumn)
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.write_v2_headers(is_v2)
.compression(cudf::io::compression_type::ZSTD)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

Expand Down Expand Up @@ -156,6 +157,7 @@ TEST_P(ParquetV2Test, MultiColumnWithNulls)
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.write_v2_headers(is_v2)
.compression(cudf::io::compression_type::ZSTD)
.metadata(expected_metadata);

cudf::io::write_parquet(out_opts);
Expand Down Expand Up @@ -197,6 +199,7 @@ TEST_P(ParquetV2Test, Strings)
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.write_v2_headers(is_v2)
.compression(cudf::io::compression_type::ZSTD)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

Expand Down
Loading
Loading