Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-group-level partitioning for Parquet #9849

Closed
wants to merge 11 commits into from
48 changes: 48 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Number of rows in each row group
std::vector<size_type> _row_group_sizes;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -489,6 +491,11 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns size of each row group.
*/
auto get_row_group_sizes() const { return _row_group_sizes; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -549,6 +556,11 @@ class parquet_writer_options {
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the size of each row group.
*/
void set_row_group_sizes(std::vector<size_type> sizes_rows) { _row_group_sizes = sizes_rows; }
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -645,6 +657,18 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the number of rows in each output row group.
*
* @param val number of rows in each row group
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_sizes(std::vector<size_type> val)
{
options.set_row_group_sizes(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -727,6 +751,8 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Number of rows in each row group
std::vector<size_type> _row_group_sizes;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -780,6 +806,11 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns size of each row group.
*/
auto get_row_group_sizes() const { return _row_group_sizes; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -831,6 +862,11 @@ class chunked_parquet_writer_options {
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the size of each row group.
*/
void set_row_group_sizes(std::vector<size_type> sizes_rows) { _row_group_sizes = sizes_rows; }

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -934,6 +970,18 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the number of rows in each output row group.
*
* @param val number of rows in each row group
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_sizes(std::vector<size_type> val)
{
options.set_row_group_sizes(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
30 changes: 18 additions & 12 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,18 @@ struct map_find_fn {
template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;

auto start_row =
block_x *
max_page_fragment_size; // This is fragment size. all chunks are multiple of these many rows.
size_type end_row = min(start_row + max_page_fragment_size, num_rows);
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
Comment on lines +105 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps these offsets should be pre-computed with a scan and then passed into the kernel? I'm not sure how many row groups we expect. The difference between 10 and 1M would indicate whether this should be a host or device computation.

If we shouldn't use a scan and pass in the precomputed offsets, then this could use std::accumulate. Might look something like this snippet (untested):

Suggested change
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
auto row_counter = thrust::transform_iterator(fragments[0], [] __device__(auto const& page){ return page.num_rows; });
auto start_row = std::accumulate(row_counter[0], row_counter[block_x], 0);

(Note: page might not be the right name for the function argument, I am just guessing from device_2dspan<PageFragment>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like this:

size_type start_row = frag.start_row;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devavret Looks about right! I'm just trying to avoid a loop on each thread when we could use a single-pass scan ahead of time. I see you've worked on this in #9810. That logic should be used here. Does #9810 need to be merged first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does #9810 need to be merged first?

Actually, that's what I was wondering just now. #9810 is close to completion and if it is merged first, then there will be many merge issues with this PR. I'm fine with merging #9810 later or taking over this one if it remains unmerged due to merge issues.

cc @quasiben @vuule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devavret do you have an ETA for addressing current comments on #9810?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2cts:
Let's aim to merge this first so Caleb has a chance to get the PR as close as possible to the finish line as possible. If 9810 already addresses some comments here, maybe those pieces can be applied to this PR (also reduces merge conflicts).

I'm not sure what's the best approach, inclined to leave the decision up to @devavret and @calebwin .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdice @hyperbolic2346 @devavret @vuule for reviews and comments. I just ran into a subtle CUDA bug in this PR when I was in the middle of writing a benchmark. I looked through #9810 and it looks like there are changes in the CUDA code that may handle edge cases I didn't consider here.

So I'm going to go ahead and try to merge #9810 into this and make appropriate changes. I will see if that resolves the issue I came across when benchmarking. I will then try to address other reviews here.

Should I convert this PR to draft in the meanwhile?

size_type end_row = min(start_row + fragments[0][block_x].num_rows, num_rows);

__shared__ EncColumnChunk* s_chunk;
__shared__ parquet_column_device_view s_col;
Expand Down Expand Up @@ -245,14 +247,18 @@ __global__ void __launch_bounds__(block_size, 1)
template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;

size_type start_row = block_x * max_page_fragment_size;
size_type end_row = min(start_row + max_page_fragment_size, num_rows);
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
Comment on lines +257 to +260
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment.

size_type end_row = min(start_row + fragments[0][block_x].num_rows, num_rows);

__shared__ EncColumnChunk s_chunk;
__shared__ parquet_column_device_view s_col;
Expand Down Expand Up @@ -335,16 +341,16 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
}

void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size);
auto const num_columns = chunks.size().second;
dim3 const dim_grid(grid_x.num_blocks, num_columns);
dim3 const dim_grid(fragments.size().second, num_columns);

populate_chunk_hash_maps_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, num_rows);
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, fragments, num_rows);
}

void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
Expand All @@ -354,16 +360,16 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
}

void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size);
auto const num_columns = chunks.size().second;
dim3 const dim_grid(grid_x.num_blocks, num_columns);
dim3 const dim_grid(fragments.size().second, num_columns);

get_dictionary_indices_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, num_rows);
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, fragments, num_rows);
}
} // namespace gpu
} // namespace parquet
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
gpuInitPageFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
uint32_t fragment_size,
int32_t fragment_size,
uint32_t max_num_rows)
{
__shared__ __align__(16) frag_init_state_s state_g;
Expand All @@ -130,7 +130,11 @@ __global__ void __launch_bounds__(block_size)
if (!t) {
// frag.num_rows = fragment_size except for the last page fragment which can be smaller.
// num_rows is fixed but fragment size could be larger if the data is strings or nested.
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows));
if (fragment_size != -1) {
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows));
} else {
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows;
}
Comment on lines +133 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (fragment_size != -1) {
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows));
} else {
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows;
}
s->frag.num_rows = fragment_size != -1 ? min(fragment_size, max_num_rows - min(start_row, max_num_rows)) : frag[blockIdx.x][blockIdx.y].num_rows;

s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;
Expand Down Expand Up @@ -1938,13 +1942,13 @@ dremel_data get_dremel_data(column_view h_col,
*
* @param[in,out] frag Fragment array [column_id][fragment_id]
* @param[in] col_desc Column description array [column_id]
* @param[in] num_fragments Number of fragments per column
* @param[in] num_fragments Number of fragments per column, -1 if fragment sizes already specified
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default 0
*/
void InitPageFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
uint32_t fragment_size,
int32_t fragment_size,
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
uint32_t num_rows,
rmm::cuda_stream_view stream)
{
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,13 @@ dremel_data get_dremel_data(column_view h_col,
* @param[in] col_desc Column description array [column_id]
* @param[in] num_fragments Number of fragments per column
* @param[in] num_columns Number of columns
* @param[in] fragment_size Number of rows per fragment
* @param[in] fragment_size Number of rows per fragment, -1 if fragment sizes already specified
* @param[in] num_rows Number of rows per column
* @param[in] stream CUDA stream to use
*/
void InitPageFragments(cudf::detail::device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
uint32_t fragment_size,
int32_t fragment_size,
uint32_t num_rows,
rmm::cuda_stream_view stream);

Expand Down Expand Up @@ -502,6 +502,7 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows,
rmm::cuda_stream_view stream);

Expand All @@ -527,6 +528,7 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
* @param stream CUDA stream to use
*/
void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<PageFragment> fragments,
size_type num_rows,
rmm::cuda_stream_view stream);

Expand Down
Loading